Annotation of freem/src/transact.c, revision 1.2

1.1       snw         1: /*
                      2:  *                            *
                      3:  *                           * *
                      4:  *                          *   *
                      5:  *                     ***************
                      6:  *                      * *       * *
                      7:  *                       *  MUMPS  *
                      8:  *                      * *       * *
                      9:  *                     ***************
                     10:  *                          *   *
                     11:  *                           * *
                     12:  *                            *
                     13:  *
                     14:  *   transact.c
                     15:  *    FreeM transaction processing support
                     16:  *
                     17:  *  
                     18:  *   Author: Serena Willis <jpw@coherent-logic.com>
                     19:  *    Copyright (C) 1998 MUG Deutschland
                     20:  *    Copyright (C) 2020 Coherent Logic Development LLC
                     21:  *
                     22:  *
                     23:  *   This file is part of FreeM.
                     24:  *
                     25:  *   FreeM is free software: you can redistribute it and/or modify
                     26:  *   it under the terms of the GNU Affero Public License as published by
                     27:  *   the Free Software Foundation, either version 3 of the License, or
                     28:  *   (at your option) any later version.
                     29:  *
                     30:  *   FreeM is distributed in the hope that it will be useful,
                     31:  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
                     32:  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
                     33:  *   GNU Affero Public License for more details.
                     34:  *
                     35:  *   You should have received a copy of the GNU Affero Public License
                     36:  *   along with FreeM.  If not, see <https://www.gnu.org/licenses/>.
                     37:  *
                     38:  **/
                     39: 
                     40: #include <string.h>
                     41: #include <stddef.h>
                     42: #include <stdio.h>
                     43: #include <stdlib.h>
                     44: #include <unistd.h>
                     45: #include <errno.h>
                     46: 
                     47: #include "mpsdef.h"
                     48: #include "transact.h"
                     49: #include "iftab.h"
                     50: #include "journal.h"
                     51: #include "shmmgr.h"
                     52: #include "mref.h"
                     53: #include "tp_check.h"
                     54: 
                     55: #define FALSE   0
                     56: #define TRUE    1
                     57: 
                     58: #if !defined(__OpenBSD__) && !defined(__APPLE__)
                     59: union semun {
                     60:     int              val;    /* Value for SETVAL */
                     61:     struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */
                     62:     unsigned short  *array;  /* Array for GETALL, SETALL */
                     63:     struct seminfo  *__buf;  /* Buffer for IPC_INFO
                     64:                                 (Linux-specific) */
                     65: };
                     66: #endif
                     67: 
                     68: void m_log (int, const char *);
                     69: 
                     70: int semid_tp;
                     71: int tp_committing = FALSE;
                     72: int tp_level = 0;
                     73: tp_transaction transactions[TP_MAX_NEST];
                     74: 
                     75: 
                     76: void tp_init(void)
                     77: {
                     78:     union semun arg;
                     79:     char err[255];
                     80:     key_t tp_sk;
                     81: 
                     82:     tp_sk = ftok (config_file, 4);
                     83:     
                     84:     if (first_process) {
                     85: 
                     86:         semid_tp = semget (tp_sk, 1, 0666 | IPC_CREAT);
                     87:         if (semid_tp == -1) {
                     88:             fprintf (stderr, "tp_init:  failed to create transaction processing semaphore [errno %d]\r\n", errno);
                     89:             exit (1);
                     90:         }
                     91: 
                     92:         arg.val = 1;
                     93:         if (semctl (semid_tp, 0, SETVAL, arg) == -1) {
                     94:             fprintf (stderr, "tp_init:  failed to initialize transaction processing semaphore\r\n");
                     95:             exit (1);
                     96:         }
                     97: 
                     98:     }
                     99:     else {
                    100:         
                    101:         semid_tp = semget (tp_sk, 1, 0);
                    102:         if (semid_tp == -1) {
                    103:             fprintf (stderr, "tp_init:  could not attach to transaction processing semaphore [errno %d]\r\n", errno);
                    104:             exit (1);
                    105:         }
                    106: 
                    107:     }
                    108: 
                    109:     return;
                    110: 
                    111: }
                    112: 
                    113: short tp_get_sem(void)
                    114: {
                    115:     int tries;
                    116:     struct sembuf s = {0, -1, 0};
                    117: 
                    118:     char msgbuf[100];
                    119: 
                    120:     snprintf (msgbuf, 99, "tp_get_sem:  process %d attempting to acquire transaction processing semaphore", pid);
                    121:     m_log (1, msgbuf);
                    122:     
                    123:     
                    124:     /* our process already owns the semaphore */
                    125:     if (shm_config->hdr->tp_owner == pid) {
                    126: 
                    127:         snprintf (msgbuf, 99, "tp_get_sem:  process %d increments transaction processing semaphore counter", pid);
                    128:         m_log (1, msgbuf);
                    129:     
                    130:         
                    131:         if (first_process == TRUE) {
                    132:             fprintf (stderr, "tp_get_sem:  daemon process increments critical section counter\r\n");
                    133:         }
                    134: 
                    135: 
                    136:         shm_config->hdr->tp_semctr++;
                    137: 
                    138:         return TRUE;
                    139:     }
                    140: 
                    141:     if (first_process == TRUE) {
                    142:         fprintf (stderr, "tp_get_sem:  daemon process enters critical section\r\n");
                    143:     }
                    144: 
                    145:     
                    146:     for (tries = 0; tries < 10; tries++) {
                    147: 
                    148:         if (semop (semid_tp, &s, 1) != -1) {
                    149:             shm_config->hdr->tp_owner = pid;
                    150:             shm_config->hdr->tp_semctr = 1;
                    151: 
                    152:             snprintf (msgbuf, 99, "tp_get_sem:  process %d takes transaction processing semaphore", pid);
                    153:             m_log (1, msgbuf);
                    154:     
                    155:             
                    156:             if (first_process == TRUE) {
                    157:                 fprintf (stderr, "tp_get_sem:  daemon process takes transaction processing semaphore\r\n");
                    158:             }
                    159: 
                    160:             return TRUE;
                    161:         }
                    162: 
                    163:         snprintf (msgbuf, 99, "tp_get_sem:  process %d attempting to acquire transaction processing semaphore (tries = %d)", pid, tries);
                    164:         m_log (1, msgbuf);
                    165:     
                    166: 
                    167:         sleep (1);
                    168: 
                    169:     }
                    170: 
                    171:     return FALSE;
                    172:     
                    173: }
                    174: 
                    175: void tp_release_sem(void)
                    176: {
                    177: 
                    178:     char msgbuf[100];
                    179: 
                    180:     if (shm_config->hdr->tp_semctr == 1) {
                    181: 
                    182:         struct sembuf s = {0, 1, 0};
                    183: 
                    184:         if (first_process == TRUE) {
                    185:             fprintf (stderr, "tp_release_sem:  daemon process leaves critical section\r\n");
                    186:         }
                    187: 
                    188:         
                    189:         shm_config->hdr->tp_semctr = 0;
                    190:         shm_config->hdr->tp_owner = 0;
                    191: 
                    192:         if (first_process == TRUE) {
                    193:             fprintf (stderr, "tp_release_sem:  daemon process relinquishes transaction processing semaphore\r\n");
                    194:         }
                    195: 
                    196: 
                    197:         snprintf (msgbuf, 99, "tp_get_sem:  process %d releases transaction processing semaphore", pid);
                    198:         m_log (1, msgbuf);
                    199: 
                    200:         
                    201:         semop (semid_tp, &s, 1);
                    202:         
                    203:     }
                    204:     else {
                    205: 
                    206:         if (first_process == TRUE) {
                    207:             fprintf (stderr, "tp_release_sem:  daemon process decrements critical section counter\r\n");
                    208:         }
                    209:         
                    210:         snprintf (msgbuf, 99, "tp_get_sem:  process %d decrements transaction processing semaphore counter", pid);
                    211:         m_log (1, msgbuf);
                    212:         
                    213:         shm_config->hdr->tp_semctr--;
                    214:     }
                    215: 
                    216:     
                    217: }
                    218: 
                    219: int tp_tstart(char *tp_id, short serial, short restartable, char **sym_save)
                    220: {
                    221:     if (tp_level == TP_MAX_NEST) {
                    222:         char m[256];
                    223: 
                    224:         snprintf (m, 256, "Attempt to exceed TP_MAX_NEST. Transaction aborted.\r\n\201");
                    225:         write_m (m);
                    226: 
                    227:         return FALSE;
                    228:     }
                    229: 
                    230:     if (((serial == TRUE) && (tp_get_sem () == TRUE)) ||
                    231:         (serial == FALSE)) {
                    232: 
                    233:         tp_level++;
                    234:         
                    235:         jnl_ent_write (JNLA_TSTART, "", "");
                    236:     
                    237:         strcpy (transactions[tp_level].tp_id, tp_id);
                    238:         
                    239:         transactions[tp_level].serial = serial;
                    240:         transactions[tp_level].restartable = restartable;
                    241:         
                    242:         transactions[tp_level].opcount = 0;
                    243:       
                    244:         return TRUE;
                    245:         
                    246:     }
                    247:     else {
                    248:         fprintf (stderr, "tp_tstart:  could not get transaction processing semaphore\r\n");
                    249:         exit (1);
                    250:     }
                    251:     
                    252: 
                    253: }
                    254: 
                    255: int tp_add_op(short islock, short action, char *key, char *data)
                    256: {
                    257:     int oc;
                    258:     freem_ref_t *gr;
                    259: 
                    260:     gr = (freem_ref_t *) malloc (sizeof (freem_ref_t));
                    261:     NULLPTRCHK(gr,"tp_add_op");
                    262:     
                    263:     mref_init (gr, MREF_RT_GLOBAL, "");
                    264:     internal_to_mref (gr, key);
                    265:     
                    266:     if (transactions[tp_level].opcount == TP_MAX_OPS) {
                    267:         char m[256];
                    268: 
                    269:         snprintf (m, 256, "Attempt to exceed TP_MAX_OPS at transaction level %d. Rolling back.\r\n\201", tp_level);
                    270:         write_m (m);
                    271: 
                    272:         free (gr);
                    273: 
                    274:         tp_trollback (1);
                    275:         tp_cleanup (1);
                    276: 
                    277:         if (transactions[tp_level].serial == TRUE) {
                    278:             tp_release_sem();
                    279:         }
                    280:         
                    281:         return FALSE;
                    282:     }
                    283:     
                    284:     /* update transaction-in-flight symbol table */
                    285:     switch (action) {
                    286: 
                    287:         case lock_inc:
                    288:         case lock_dec:
                    289:         case lock_old:
                    290:         case set_sym:
                    291:             iftab_insert (action, key, data, tp_level);
                    292:             break;
                    293: 
                    294:         case kill_sym:
                    295:             iftab_kill (key);
                    296:             break;
                    297:             
                    298:     }
                    299: 
                    300:     if (transactions[tp_level].serial == TRUE) {
                    301:         /* mark the global for checkpointing */
                    302:         cptab_insert (tp_level, gr->name);
                    303:     }
                    304: 
                    305:     free (gr);
                    306: 
                    307:     transactions[tp_level].opcount = transactions[tp_level].opcount + 1;
                    308: 
                    309:     oc = transactions[tp_level].opcount;
                    310: 
                    311:     transactions[tp_level].ops[oc].is_lock = islock;
                    312:     transactions[tp_level].ops[oc].action = action;
                    313: 
1.2     ! snw       314:     stcpy ((char *) &transactions[tp_level].ops[oc].key, key);
        !           315:     stcpy ((char *) &transactions[tp_level].ops[oc].data, data);
1.1       snw       316:        
                    317:  
                    318:     return TRUE;
                    319: }
                    320: 
                    321: int tp_tcommit(void)
                    322: {
                    323:     register int i;
                    324:     short is_serial = transactions[tp_level].serial;
                    325:     
                    326:     tp_committing = TRUE;
                    327: 
                    328:     if (is_serial) {
                    329:         /* checkpoint all globals involved in transaction */
                    330:         cptab_precommit (tp_level);
                    331:     }
                    332:     
                    333:     for(i = 1; i <= transactions[tp_level].opcount; i++) {
                    334:         
                    335:         if (transactions[tp_level].ops[i].is_lock == FALSE) {
                    336:             global (transactions[tp_level].ops[i].action, transactions[tp_level].ops[i].key, transactions[tp_level].ops[i].data);
                    337: 
                    338:             if (merr () > OK) goto commit_error;
                    339:                 
                    340:         }
                    341:         
                    342:     }
                    343: 
                    344:     jnl_ent_write (JNLA_TCOMMIT, "\201", "\201");
                    345: 
                    346:     if (is_serial) {
                    347:         cptab_postcommit (tp_level);
                    348:     }
                    349: 
                    350:     goto commit_done;
                    351:     
                    352: commit_error:
                    353:     
                    354:     tp_trollback (1);
                    355: 
                    356: commit_done:
                    357: 
                    358:     tp_cleanup (1);
                    359:     
                    360:     tp_committing = FALSE;
                    361: 
                    362:     if (is_serial) {
                    363:         tp_release_sem ();
                    364:     }
                    365:     
                    366:     return TRUE;
                    367: }
                    368: 
                    369: int tp_cleanup(int levels)
                    370: {
                    371:     register int i;
                    372: 
                    373:     for (i = tp_level; i >= (((tp_level - levels) >= 0) ? tp_level - levels : 0); i--) {
                    374:         iftab_pop_tlevel (i);
                    375:     }
                    376: 
                    377:     tp_level = ((tp_level - levels) >= 0) ? tp_level - levels : 0;
                    378: 
                    379:     return TRUE;
                    380: }
                    381: 
                    382: int tp_trollback(int levels)
                    383: {
                    384:     register int i;
                    385:     register int j;
                    386: 
                    387: //    for (i = 0; i < levels; i++) {
                    388:     for (i = tp_level; i >= (((tp_level - levels) >= 0) ? tp_level - levels : 0); i--) {
                    389:         
                    390:         for (j = 1; j <= transactions[i].opcount; j++) {
                    391:             
                    392:             if (transactions[i].ops[j].is_lock == TRUE) {
                    393:                 locktab_decrement (transactions[i].ops[j].key, -1);
                    394:             }
                    395:             
                    396:         }
                    397: 
                    398:         if (transactions[i].serial == TRUE) {
                    399:             cptab_rollback (i);
                    400:         }
                    401:         
                    402:     }
                    403: 
                    404: 
                    405:     return TRUE;
                    406: }
                    407: 
                    408: int tp_trestart(void)
                    409: {
                    410:     return 0;
                    411: }
                    412: 
                    413: void tp_tdump(void)
                    414: {
                    415: 
                    416:     int i, j;
                    417: 
                    418:     char tkey[256];
                    419:     char tdata[256];
                    420:     char tact[256];
                    421:     
                    422:     set_io (UNIX);
                    423: 
                    424:     if (tp_level == 0) {
                    425:         printf("No transaction is active.\n");
                    426: 
                    427:         return;
                    428:     }
                    429: 
                    430:     for(i = 1; i <= tp_level; i++) {
                    431: 
                    432:         if(i == tp_level) {
                    433:             printf(" $TLEVEL %d*\n", i);
                    434:         }
                    435:         else {
                    436:             printf(" $TLEVEL %d\n", i);
                    437:         }
                    438: 
                    439:         printf("  Operations for Transaction ID: %s [%s%s]\n",
                    440:                transactions[i].tp_id,
                    441:                ((transactions[i].restartable == TRUE) ? "RESTARTABLE" : "NON-RESTARTABLE"),
                    442:                ((transactions[i].serial == TRUE) ? " SERIAL" : " BATCH"));
                    443: 
                    444:         printf ("\n   %-10s%-15s%-15s\n", "OP. NO.", "ACTION", "KEY/DATA");
                    445:         printf ("   %-10s%-15s%-15s\n", "-------", "------", "--------");
                    446:         
                    447:         
                    448:         for(j = 1; j <= transactions[i].opcount; j++) {          
                    449:             stcpy (tkey, transactions[i].ops[j].key);
                    450:             stcnv_m2c (tkey);
                    451:             stcpy (tdata, transactions[i].ops[j].data);
                    452:             stcnv_m2c (tdata);
                    453: 
                    454:             tp_get_op_name (tact, transactions[i].ops[j].action);
                    455: 
                    456:             if (transactions[i].ops[j].action == set_sym) {
                    457:                 printf ("   %-10d%-15s%s=%s\n", j, tact, tkey, tdata);
                    458:             }
                    459:             else {
                    460:                 printf ("   %-10d%-15s%s\n", j, tact, tkey); 
                    461:             }
                    462:             
                    463:         }
                    464: 
                    465:         cptab_dump (i);
                    466:         
                    467:     }
                    468: 
                    469: 
                    470:     set_io (MUMPS);
                    471: }
                    472: 
                    473: void tp_get_op_name(char *buf, const short action)
                    474: {
                    475:     switch (action) {
                    476:         
                    477:         case set_sym:
                    478:             strcpy (buf, "SET");
                    479:             break;
                    480: 
                    481:         case killone:
                    482:         case kill_sym:
                    483:         case kill_all:
                    484:         case killexcl:
                    485:             strcpy (buf, "KILL");
                    486:             break;
                    487: 
                    488:         case new_sym:
                    489:         case new_all:
                    490:         case newexcl:
                    491:             strcpy (buf, "NEW");
                    492:             break;
                    493: 
                    494:         case get_sym:
                    495:             strcpy (buf, "GET");
                    496:             break;
                    497: 
                    498:         case dat:
                    499:             strcpy (buf, "$DATA");
                    500:             break;
                    501: 
                    502:         case fra_order:
                    503:             strcpy (buf, "$ORDER");
                    504:             break;
                    505: 
                    506:         case fra_query:
                    507:         case bigquery:
                    508:             strcpy (buf, "$QUERY");
                    509:             break;
                    510: 
                    511:         case getinc:
                    512:             strcpy (buf, "$INCREMENT");
                    513:             break;
                    514: 
                    515:         case getnext:
                    516:             strcpy (buf, "$NEXT");
                    517:             break;
                    518: 
                    519:         case lock_inc:
                    520:             strcpy (buf, "LOCK (INCR)");
                    521:             break;
                    522: 
                    523:         case lock_old:
                    524:             strcpy (buf, "LOCK (TRAD)");
                    525:             break;
                    526:             
                    527:     }
                    528:             
                    529: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>