A WebSQL adapter for PawSQLite
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

156 lines
3.4 KiB

import { WebSQLAdapterError } from "./websql_adapter_error.mjs";
import { mapResult } from "./result_mapper.mjs";
import { log } from "./log.mjs";
class Task {
constructor(job, startsTransaction=false, endsTransaction=false) {
this._job = job;
if (startsTransaction && endsTransaction) {
throw new Error("Task cannot start and end a transaction.");
}
this.startsTransaction = startsTransaction;
this.endsTransaction = endsTransaction;
this.result = new Promise ((resolve, reject) => {
this._resolve = resolve;
this._reject = reject;
});
}
run(tx) {
return this._job(tx).then((result) => {
this._resolve(result);
return result;
}, (e) => {
this._reject(e);
});
}
}
export class TransactionManager {
constructor(db) {
this.db = db;
this._txCount = 0;
this._tasks = [];
this._processing = false;
}
begin() {
return this._addTask(new Task((tx) => {
return new Promise((resolve, reject) => {
if (tx) {
reject(new WebSQLAdapterError("BEGIN called with an active " +
"transaction. This should not happen"));
return;
}
this.db.transaction((tx) => {
resolve(tx);
});
});
}, true)).then(() => mapResult());
}
sql(sql, args = []) {
return this._addTask(new Task((tx) => {
return this._executeSql(tx, sql, args);
}));
}
commit() {
return this._addTask(new Task((tx) => {
return Promise.resolve(mapResult());
}, false, true));
}
rollback() {
// Hack to manually cause rollback:
// Intentionally cause an error with rollbackOnError set to true
return this._addTask(new Task((tx) => {
return this._executeSql(tx, "", [], true).catch(() => mapResult());
}, false, true));
}
async _process() {
if (this._processing) {
return;
}
this._processing = true;
let tx = null;
let keepaliveCount = 0;
while (true) {
const tasks = this._tasks;
this._tasks = [];
if (tasks.length) {
const promises = [];
for (const task of tasks) {
const promise = task.run(tx);
if (task.startsTransaction) {
tx = await promise;
this._txCount++;
} else {
if (task.endsTransaction) {
tx = null;
keepaliveCount = 0;
}
promises.push(promise);
}
}
await Promise.all(promises);
} else {
if (tx) {
await this._nop(tx);
keepaliveCount++;
if (keepaliveCount % 5000 === 0) {
log(`Transaction: ${ this._txCount } Keepalive: #${ keepaliveCount }`);
}
} else {
break;
}
}
}
this._processing = false;
}
_addTask(task) {
this._tasks.push(task);
this._process();
return task.result;
}
_executeSql(tx, sql, sqlArgs = [], rollbackOnError = false) {
return new Promise((resolve, reject) => {
if (!tx) {
reject(new WebSQLAdapterError("No transaction. This should not be " +
" possible"));
return;
}
tx.executeSql(sql, sqlArgs, (tx, result) => {
resolve(mapResult(result));
}, (tx, e) => {
reject(WebSQLAdapterError.from(e));
return rollbackOnError;
});
});
}
_nop(tx) {
return this._executeSql(tx, "SELECT 1");
}
}