You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
156 lines
3.4 KiB
156 lines
3.4 KiB
3 years ago
|
import { WebSQLAdapterError } from "./websql_adapter_error.mjs";
|
||
|
import { mapResult } from "./result_mapper.mjs";
|
||
|
import { log } from "./log.mjs";
|
||
|
|
||
|
|
||
|
class Task {
|
||
|
constructor(job, startsTransaction=false, endsTransaction=false) {
|
||
|
this._job = job;
|
||
|
|
||
|
if (startsTransaction && endsTransaction) {
|
||
|
throw new Error("Task cannot start and end a transaction.");
|
||
|
}
|
||
|
|
||
|
this.startsTransaction = startsTransaction;
|
||
|
this.endsTransaction = endsTransaction;
|
||
|
|
||
|
this.result = new Promise ((resolve, reject) => {
|
||
|
this._resolve = resolve;
|
||
|
this._reject = reject;
|
||
|
});
|
||
|
}
|
||
|
|
||
|
run(tx) {
|
||
|
return this._job(tx).then((result) => {
|
||
|
this._resolve(result);
|
||
|
return result;
|
||
|
}, (e) => {
|
||
|
this._reject(e);
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
export class TransactionManager {
|
||
|
constructor(db) {
|
||
|
this.db = db;
|
||
|
|
||
|
this._txCount = 0;
|
||
|
this._tasks = [];
|
||
|
this._processing = false;
|
||
|
}
|
||
|
|
||
|
begin() {
|
||
|
return this._addTask(new Task((tx) => {
|
||
|
return new Promise((resolve, reject) => {
|
||
|
if (tx) {
|
||
|
reject(new WebSQLAdapterError("BEGIN called with an active " +
|
||
|
"transaction. This should not happen"));
|
||
|
return;
|
||
|
}
|
||
|
this.db.transaction((tx) => {
|
||
|
resolve(tx);
|
||
|
});
|
||
|
});
|
||
|
}, true)).then(() => mapResult());
|
||
|
}
|
||
|
|
||
|
sql(sql, args = []) {
|
||
|
return this._addTask(new Task((tx) => {
|
||
|
return this._executeSql(tx, sql, args);
|
||
|
}));
|
||
|
}
|
||
|
|
||
|
commit() {
|
||
|
return this._addTask(new Task((tx) => {
|
||
|
return Promise.resolve(mapResult());
|
||
|
}, false, true));
|
||
|
}
|
||
|
|
||
|
rollback() {
|
||
|
// Hack to manually cause rollback:
|
||
|
// Intentionally cause an error with rollbackOnError set to true
|
||
|
return this._addTask(new Task((tx) => {
|
||
|
return this._executeSql(tx, "", [], true).catch(() => mapResult());
|
||
|
}, false, true));
|
||
|
}
|
||
|
|
||
|
|
||
|
async _process() {
|
||
|
if (this._processing) {
|
||
|
return;
|
||
|
}
|
||
|
this._processing = true;
|
||
|
|
||
|
let tx = null;
|
||
|
let keepaliveCount = 0;
|
||
|
|
||
|
while (true) {
|
||
|
const tasks = this._tasks;
|
||
|
this._tasks = [];
|
||
|
|
||
|
|
||
|
if (tasks.length) {
|
||
|
const promises = [];
|
||
|
|
||
|
for (const task of tasks) {
|
||
|
const promise = task.run(tx);
|
||
|
|
||
|
if (task.startsTransaction) {
|
||
|
tx = await promise;
|
||
|
this._txCount++;
|
||
|
} else {
|
||
|
if (task.endsTransaction) {
|
||
|
tx = null;
|
||
|
keepaliveCount = 0;
|
||
|
}
|
||
|
|
||
|
promises.push(promise);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
await Promise.all(promises);
|
||
|
} else {
|
||
|
if (tx) {
|
||
|
await this._nop(tx);
|
||
|
keepaliveCount++;
|
||
|
if (keepaliveCount % 5000 === 0) {
|
||
|
log(`Transaction: ${ this._txCount } Keepalive: #${ keepaliveCount }`);
|
||
|
}
|
||
|
} else {
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
this._processing = false;
|
||
|
}
|
||
|
|
||
|
|
||
|
_addTask(task) {
|
||
|
this._tasks.push(task);
|
||
|
this._process();
|
||
|
return task.result;
|
||
|
}
|
||
|
|
||
|
|
||
|
_executeSql(tx, sql, sqlArgs = [], rollbackOnError = false) {
|
||
|
return new Promise((resolve, reject) => {
|
||
|
if (!tx) {
|
||
|
reject(new WebSQLAdapterError("No transaction. This should not be " +
|
||
|
" possible"));
|
||
|
return;
|
||
|
}
|
||
|
tx.executeSql(sql, sqlArgs, (tx, result) => {
|
||
|
resolve(mapResult(result));
|
||
|
}, (tx, e) => {
|
||
|
reject(WebSQLAdapterError.from(e));
|
||
|
return rollbackOnError;
|
||
|
});
|
||
|
});
|
||
|
}
|
||
|
|
||
|
_nop(tx) {
|
||
|
return this._executeSql(tx, "SELECT 1");
|
||
|
}
|
||
|
}
|