You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
156 lines
3.4 KiB
156 lines
3.4 KiB
import { WebSQLAdapterError } from "./websql_adapter_error.mjs"; |
|
import { mapResult } from "./result_mapper.mjs"; |
|
import { log } from "./log.mjs"; |
|
|
|
|
|
class Task { |
|
constructor(job, startsTransaction=false, endsTransaction=false) { |
|
this._job = job; |
|
|
|
if (startsTransaction && endsTransaction) { |
|
throw new Error("Task cannot start and end a transaction."); |
|
} |
|
|
|
this.startsTransaction = startsTransaction; |
|
this.endsTransaction = endsTransaction; |
|
|
|
this.result = new Promise ((resolve, reject) => { |
|
this._resolve = resolve; |
|
this._reject = reject; |
|
}); |
|
} |
|
|
|
run(tx) { |
|
return this._job(tx).then((result) => { |
|
this._resolve(result); |
|
return result; |
|
}, (e) => { |
|
this._reject(e); |
|
}); |
|
} |
|
} |
|
|
|
|
|
export class TransactionManager { |
|
constructor(db) { |
|
this.db = db; |
|
|
|
this._txCount = 0; |
|
this._tasks = []; |
|
this._processing = false; |
|
} |
|
|
|
begin() { |
|
return this._addTask(new Task((tx) => { |
|
return new Promise((resolve, reject) => { |
|
if (tx) { |
|
reject(new WebSQLAdapterError("BEGIN called with an active " + |
|
"transaction. This should not happen")); |
|
return; |
|
} |
|
this.db.transaction((tx) => { |
|
resolve(tx); |
|
}); |
|
}); |
|
}, true)).then(() => mapResult()); |
|
} |
|
|
|
sql(sql, args = []) { |
|
return this._addTask(new Task((tx) => { |
|
return this._executeSql(tx, sql, args); |
|
})); |
|
} |
|
|
|
commit() { |
|
return this._addTask(new Task((tx) => { |
|
return Promise.resolve(mapResult()); |
|
}, false, true)); |
|
} |
|
|
|
rollback() { |
|
// Hack to manually cause rollback: |
|
// Intentionally cause an error with rollbackOnError set to true |
|
return this._addTask(new Task((tx) => { |
|
return this._executeSql(tx, "", [], true).catch(() => mapResult()); |
|
}, false, true)); |
|
} |
|
|
|
|
|
async _process() { |
|
if (this._processing) { |
|
return; |
|
} |
|
this._processing = true; |
|
|
|
let tx = null; |
|
let keepaliveCount = 0; |
|
|
|
while (true) { |
|
const tasks = this._tasks; |
|
this._tasks = []; |
|
|
|
|
|
if (tasks.length) { |
|
const promises = []; |
|
|
|
for (const task of tasks) { |
|
const promise = task.run(tx); |
|
|
|
if (task.startsTransaction) { |
|
tx = await promise; |
|
this._txCount++; |
|
} else { |
|
if (task.endsTransaction) { |
|
tx = null; |
|
keepaliveCount = 0; |
|
} |
|
|
|
promises.push(promise); |
|
} |
|
} |
|
|
|
await Promise.all(promises); |
|
} else { |
|
if (tx) { |
|
await this._nop(tx); |
|
keepaliveCount++; |
|
if (keepaliveCount % 5000 === 0) { |
|
log(`Transaction: ${ this._txCount } Keepalive: #${ keepaliveCount }`); |
|
} |
|
} else { |
|
break; |
|
} |
|
} |
|
} |
|
|
|
this._processing = false; |
|
} |
|
|
|
|
|
_addTask(task) { |
|
this._tasks.push(task); |
|
this._process(); |
|
return task.result; |
|
} |
|
|
|
|
|
_executeSql(tx, sql, sqlArgs = [], rollbackOnError = false) { |
|
return new Promise((resolve, reject) => { |
|
if (!tx) { |
|
reject(new WebSQLAdapterError("No transaction. This should not be " + |
|
" possible")); |
|
return; |
|
} |
|
tx.executeSql(sql, sqlArgs, (tx, result) => { |
|
resolve(mapResult(result)); |
|
}, (tx, e) => { |
|
reject(WebSQLAdapterError.from(e)); |
|
return rollbackOnError; |
|
}); |
|
}); |
|
} |
|
|
|
_nop(tx) { |
|
return this._executeSql(tx, "SELECT 1"); |
|
} |
|
} |