CEC-4523: add bulk archive to /packages (#372)

* CEC-4523: add bulk archive to /packages
This commit is contained in:
Tristan Timblin
2023-06-26 12:35:17 -04:00
committed by GitHub
parent 787bb12260
commit 60c1f414a6
6 changed files with 328 additions and 67 deletions

View File

@@ -1,36 +1,67 @@
export default class TaskRunner {
constructor(concurrencyLimit = 1) {
this.queue = [];
this.running = 0;
this.concurrencyLimit = concurrencyLimit;
constructor(concurrencyLimit = 1, total) {
this._queue = [];
this._index = 0;
this._running = 0;
this._complete = 0;
this._concurrencyLimit = concurrencyLimit;
if (total) {
this._total = total;
this._responses = new Array(total);
}
this._onComplete = new Promise((resolve, reject) => {
this._onCompleteResolve = resolve;
this._onCompleteReject = reject;
});
}
execute() {
if (this.running >= this.concurrencyLimit || this.queue.length === 0) {
if (this._running >= this._concurrencyLimit || this._queue.length === 0) {
return;
}
const task = this.queue.shift();
this.running += 1;
task();
const task = this._queue.shift();
this._running += 1;
task(this._index);
this._index += 1;
}
async push(fn) {
return new Promise((resolve, reject) => {
const task = async () => {
const task = async (index) => {
try {
const result = await fn();
resolve(result);
const response = await fn();
if (this._responses) {
this._responses[index] = response;
}
resolve(response);
} catch (error) {
reject(error);
} finally {
this.running -= 1;
this._running -= 1;
this.#progress();
this.execute();
}
}
this.queue.push(task);
this._queue.push(task);
this.execute();
});
}
#progress() {
this._complete += 1;
if (this._complete === this._total) {
this._onCompleteResolve(this._responses);
}
}
async onComplete() {
if (!this._total) {
this._onCompleteReject(new Error("Total is required to determine onComplete."));
}
return this._onComplete;
}
}

View File

@@ -4,6 +4,10 @@ const mockPromise = async (id, ms) => {
await new Promise(resolve => setTimeout(resolve, ms));
return id;
}
const mockPromiseError = async (id, ms) => {
await new Promise(resolve => setTimeout(resolve, ms));
return new Error(`Task ${id} had an error`);
}
const asyncFn1 = () => mockPromise(1, 200);
const asyncFn2 = () => mockPromise(2, 100);
@@ -12,19 +16,19 @@ const asyncFn3 = () => mockPromise(3, 50);
describe("TaskRunner", () => {
it("runs task added to queue, when space available", () => {
const taskRunner = new TaskRunner(2);
expect(taskRunner.running).toEqual(0);
expect(taskRunner._running).toEqual(0);
taskRunner.push(() => mockPromise(1, 300));
expect(taskRunner.running).toEqual(1);
expect(taskRunner._running).toEqual(1);
});
it("keeps task in queue when at concurrency limit", () => {
const taskRunner = new TaskRunner(2);
expect(taskRunner.running).toEqual(0);
expect(taskRunner._running).toEqual(0);
taskRunner.push(() => mockPromise(1, 100));
taskRunner.push(() => mockPromise(2, 25));
taskRunner.push(() => mockPromise(3, 10));
expect(taskRunner.running).toEqual(2);
expect(taskRunner.queue.length).toEqual(1);
expect(taskRunner._running).toEqual(2);
expect(taskRunner._queue.length).toEqual(1);
});
it("runs queued tasks as space becomes available", async () => {
@@ -32,9 +36,9 @@ describe("TaskRunner", () => {
taskRunner.push(() => mockPromise(1, 600));
taskRunner.push(() => mockPromise(2, 300));
taskRunner.push(() => mockPromise(3, 100));
expect(taskRunner.queue.length).toEqual(1);
expect(taskRunner._queue.length).toEqual(1);
await new Promise(r => setTimeout(r, 301));
expect(taskRunner.queue.length).toEqual(0);
expect(taskRunner._queue.length).toEqual(0);
});
it("runs tasks in order", async () => {
@@ -52,7 +56,44 @@ describe("TaskRunner", () => {
.then((id) => {
actual.push(id);
});
await new Promise(resolve => setTimeout(resolve, 500));
expect(actual).toEqual([2, 3, 1]);
await new Promise(resolve => setTimeout(resolve, 500));
expect(actual).toEqual([2, 3, 1]);
});
})
it("resolves a promise when all tasks are complete", async () => {
const taskRunner = new TaskRunner(2, 5);
taskRunner.push(() => mockPromise(1, 600));
taskRunner.push(() => mockPromise(2, 300));
taskRunner.push(() => mockPromise(3, 200));
taskRunner.push(() => mockPromise(4, 600));
taskRunner.push(() => mockPromise(5, 100));
await taskRunner.onComplete().then((actual) => {
expect(actual).toStrictEqual([1, 2, 3, 4, 5]);
});
});
it("resolves a promise when all tasks are complete, even if some fail", async () => {
const error = new Error(`Task 3 had an error`);
const taskRunner = new TaskRunner(2, 5);
taskRunner.push(() => mockPromise(1, 600));
taskRunner.push(() => mockPromise(2, 300));
taskRunner.push(() => mockPromiseError(3, 200));
taskRunner.push(() => mockPromise(4, 600));
taskRunner.push(() => mockPromise(5, 100));
await taskRunner.onComplete().then((actual) => {
expect(actual).toStrictEqual([1, 2, error, 4, 5]);
});
});
it("rejects a promise when the total number of tasks is unknown", async () => {
const taskRunner = new TaskRunner(2);
taskRunner.push(() => mockPromise(1, 600));
taskRunner.push(() => mockPromise(2, 300));
taskRunner.push(() => mockPromise(3, 200));
taskRunner.push(() => mockPromise(4, 600));
taskRunner.push(() => mockPromise(5, 100));
await taskRunner.onComplete().catch((error) => {
expect(error.message).toBe("Total is required to determine onComplete.");
});
});
});