import { WebSQLAdapterError } from "./websql_adapter_error.mjs"; import { mapResult } from "./result_mapper.mjs"; import { log } from "../../log.mjs"; class Task { constructor(job, startsTransaction=false, endsTransaction=false) { this._job = job; if (startsTransaction && endsTransaction) { throw new Error("Task cannot start and end a transaction."); } this.startsTransaction = startsTransaction; this.endsTransaction = endsTransaction; this.result = new Promise ((resolve, reject) => { this._resolve = resolve; this._reject = reject; }); } run(tx) { return this._job(tx).then((result) => { this._resolve(result); return result; }, (e) => { this._reject(e); }); } } export class TransactionManager { constructor(db) { this.db = db; this._txCount = 0; this._tasks = []; this._processing = false; } begin() { return this._addTask(new Task((tx) => { return new Promise((resolve, reject) => { if (tx) { reject(new WebSQLAdapterError("BEGIN called with an active " + "transaction. This should not happen")); return; } this.db.transaction((tx) => { resolve(tx); }); }); }, true)).then(() => mapResult()); } sql(sql, args = []) { return this._addTask(new Task((tx) => { return this._executeSql(tx, sql, args); })); } commit() { return this._addTask(new Task((tx) => { return Promise.resolve(mapResult()); }, false, true)); } rollback() { // Hack to manually cause rollback: // Intentionally cause an error with rollbackOnError set to true return this._addTask(new Task((tx) => { return this._executeSql(tx, "", [], true).catch(() => mapResult()); }, false, true)); } async _process() { if (this._processing) { return; } this._processing = true; let tx = null; let keepaliveCount = 0; while (true) { const tasks = this._tasks; this._tasks = []; if (tasks.length) { const promises = []; for (const task of tasks) { const promise = task.run(tx); if (task.startsTransaction) { tx = await promise; this._txCount++; } else { if (task.endsTransaction) { tx = null; keepaliveCount = 0; } promises.push(promise); } } await Promise.all(promises); } else { if (tx) { await this._nop(tx); keepaliveCount++; if (keepaliveCount % 5000 === 0) { log(`Transaction: ${ this._txCount } Keepalive: #${ keepaliveCount }`); } } else { break; } } } this._processing = false; } _addTask(task) { this._tasks.push(task); this._process(); return task.result; } _executeSql(tx, sql, sqlArgs = [], rollbackOnError = false) { return new Promise((resolve, reject) => { if (!tx) { reject(new WebSQLAdapterError("No transaction. This should not be " + " possible")); return; } tx.executeSql(sql, sqlArgs, (tx, result) => { resolve(mapResult(result)); }, (tx, e) => { reject(WebSQLAdapterError.from(e)); return rollbackOnError; }); }); } _nop(tx) { return this._executeSql(tx, "SELECT 1"); } }