From 71d942687702c3eb85a8f5bdeaa9aa5634712008 Mon Sep 17 00:00:00 2001
From: Josh Gross <jogros@microsoft.com>
Date: Wed, 18 Dec 2019 11:02:14 -0500
Subject: [PATCH] Pull in fixes from testing branch

---
 __tests__/save.test.ts |  25 ++++++-
 src/cacheHttpClient.ts | 166 +++++++++++++++++++++++++----------------
 src/save.ts            |  12 ++-
 3 files changed, 136 insertions(+), 67 deletions(-)

diff --git a/__tests__/save.test.ts b/__tests__/save.test.ts
index ad97fad..b739b4a 100644
--- a/__tests__/save.test.ts
+++ b/__tests__/save.test.ts
@@ -240,6 +240,13 @@ test("save with server error outputs warning", async () => {
     const cachePath = path.resolve(inputPath);
     testUtils.setInput(Inputs.Path, inputPath);
 
+    const cacheId = 4;
+    const reserveCacheMock = jest
+        .spyOn(cacheHttpClient, "reserveCache")
+        .mockImplementationOnce(() => {
+            return Promise.resolve(cacheId);
+        });
+
     const createTarMock = jest.spyOn(tar, "createTar");
 
     const saveCacheMock = jest
@@ -250,13 +257,16 @@ test("save with server error outputs warning", async () => {
 
     await run();
 
+    expect(reserveCacheMock).toHaveBeenCalledTimes(1);
+    expect(reserveCacheMock).toHaveBeenCalledWith(primaryKey);
+
     const archivePath = path.join("/foo/bar", "cache.tgz");
 
     expect(createTarMock).toHaveBeenCalledTimes(1);
     expect(createTarMock).toHaveBeenCalledWith(archivePath, cachePath);
 
     expect(saveCacheMock).toHaveBeenCalledTimes(1);
-    expect(saveCacheMock).toHaveBeenCalledWith(primaryKey, archivePath);
+    expect(saveCacheMock).toHaveBeenCalledWith(cacheId, archivePath);
 
     expect(logWarningMock).toHaveBeenCalledTimes(1);
     expect(logWarningMock).toHaveBeenCalledWith("HTTP Error Occurred");
@@ -289,18 +299,29 @@ test("save with valid inputs uploads a cache", async () => {
     const cachePath = path.resolve(inputPath);
     testUtils.setInput(Inputs.Path, inputPath);
 
+    const cacheId = 4;
+    const reserveCacheMock = jest
+        .spyOn(cacheHttpClient, "reserveCache")
+        .mockImplementationOnce(() => {
+            return Promise.resolve(cacheId);
+        });
+
     const createTarMock = jest.spyOn(tar, "createTar");
+
     const saveCacheMock = jest.spyOn(cacheHttpClient, "saveCache");
 
     await run();
 
+    expect(reserveCacheMock).toHaveBeenCalledTimes(1);
+    expect(reserveCacheMock).toHaveBeenCalledWith(primaryKey);
+
     const archivePath = path.join("/foo/bar", "cache.tgz");
 
     expect(createTarMock).toHaveBeenCalledTimes(1);
     expect(createTarMock).toHaveBeenCalledWith(archivePath, cachePath);
 
     expect(saveCacheMock).toHaveBeenCalledTimes(1);
-    expect(saveCacheMock).toHaveBeenCalledWith(primaryKey, archivePath);
+    expect(saveCacheMock).toHaveBeenCalledWith(cacheId, archivePath);
 
     expect(failedMock).toHaveBeenCalledTimes(0);
 });
diff --git a/src/cacheHttpClient.ts b/src/cacheHttpClient.ts
index 7eee046..9501762 100644
--- a/src/cacheHttpClient.ts
+++ b/src/cacheHttpClient.ts
@@ -16,8 +16,6 @@ import {
 } from "./contracts";
 import * as utils from "./utils/actionUtils";
 
-const MAX_CHUNK_SIZE = 4000000; // 4 MB Chunks
-
 function isSuccessStatusCode(statusCode: number): boolean {
     return statusCode >= 200 && statusCode < 300;
 }
@@ -50,18 +48,20 @@ function getRequestOptions(): IRequestOptions {
     return requestOptions;
 }
 
-export async function getCacheEntry(
-    keys: string[]
-): Promise<ArtifactCacheEntry | null> {
-    const cacheUrl = getCacheApiUrl();
+function createRestClient(): RestClient {
     const token = process.env["ACTIONS_RUNTIME_TOKEN"] || "";
     const bearerCredentialHandler = new BearerCredentialHandler(token);
 
-    const resource = `cache?keys=${encodeURIComponent(keys.join(","))}`;
-
-    const restClient = new RestClient("actions/cache", cacheUrl, [
+    return new RestClient("actions/cache", getCacheApiUrl(), [
         bearerCredentialHandler
     ]);
+}
+
+export async function getCacheEntry(
+    keys: string[]
+): Promise<ArtifactCacheEntry | null> {
+    const restClient = createRestClient();
+    const resource = `cache?keys=${encodeURIComponent(keys.join(","))}`;
 
     const response = await restClient.get<ArtifactCacheEntry>(
         resource,
@@ -106,97 +106,108 @@ export async function downloadCache(
     await pipeResponseToStream(downloadResponse, stream);
 }
 
-// Returns Cache ID
-async function reserveCache(
-    restClient: RestClient,
-    key: string
-): Promise<number> {
+// Reserve Cache
+export async function reserveCache(key: string): Promise<number> {
+    const restClient = createRestClient();
+
     const reserveCacheRequest: ReserveCacheRequest = {
         key
     };
     const response = await restClient.create<ReserverCacheResponse>(
         "caches",
-        reserveCacheRequest
+        reserveCacheRequest,
+        getRequestOptions()
     );
 
-    return response?.result?.cacheId || -1;
+    return response?.result?.cacheId ?? -1;
 }
 
-function getContentRange(start: number, length: number): string {
+function getContentRange(start: number, end: number): string {
     // Format: `bytes start-end/filesize
     // start and end are inclusive
     // filesize can be *
     // For a 200 byte chunk starting at byte 0:
     // Content-Range: bytes 0-199/*
-    return `bytes ${start}-${start + length - 1}/*`;
+    return `bytes ${start}-${end}/*`;
 }
 
 async function uploadChunk(
     restClient: RestClient,
-    cacheId: number,
-    data: Buffer,
-    offset: number
+    resourceUrl: string,
+    data: NodeJS.ReadableStream,
+    start: number,
+    end: number
 ): Promise<IRestResponse<void>> {
+    core.debug(
+        `Uploading chunk of size ${end -
+            start +
+            1} bytes at offset ${start} with content range: ${getContentRange(
+            start,
+            end
+        )}`
+    );
     const requestOptions = getRequestOptions();
     requestOptions.additionalHeaders = {
         "Content-Type": "application/octet-stream",
-        "Content-Range": getContentRange(offset, data.byteLength)
+        "Content-Range": getContentRange(start, end)
     };
 
-    return await restClient.update(
-        cacheId.toString(),
-        data.toString("utf8"),
+    return await restClient.uploadStream<void>(
+        "PATCH",
+        resourceUrl,
+        data,
         requestOptions
     );
 }
 
-async function commitCache(
+async function uploadFile(
     restClient: RestClient,
     cacheId: number,
-    filesize: number
-): Promise<IRestResponse<void>> {
-    const requestOptions = getRequestOptions();
-    const commitCacheRequest: CommitCacheRequest = { size: filesize };
-    return await restClient.create(
-        cacheId.toString(),
-        commitCacheRequest,
-        requestOptions
-    );
-}
-
-export async function saveCache(
-    key: string,
     archivePath: string
 ): Promise<void> {
-    const token = process.env["ACTIONS_RUNTIME_TOKEN"] || "";
-    const bearerCredentialHandler = new BearerCredentialHandler(token);
-
-    const restClient = new RestClient("actions/cache", getCacheApiUrl(), [
-        bearerCredentialHandler
-    ]);
-
-    // Reserve Cache
-    const cacheId = await reserveCache(restClient, key);
-    if (cacheId < 0) {
-        throw new Error(`Unable to reserve cache.`);
-    }
-
     // Upload Chunks
-    const stream = fs.createReadStream(archivePath);
-    let streamIsClosed = false;
-    stream.on("close", () => {
-        streamIsClosed = true;
-    });
+    const fileSize = fs.statSync(archivePath).size;
+    const resourceUrl = getCacheApiUrl() + "caches/" + cacheId.toString();
+    const responses: IRestResponse<void>[] = [];
+    const fd = fs.openSync(archivePath, "r");
 
-    const uploads: Promise<IRestResponse<void>>[] = [];
+    const concurrency = 4; // # of HTTP requests in parallel
+    const MAX_CHUNK_SIZE = 32000000; // 32 MB Chunks
+    core.debug(`Concurrency: ${concurrency} and Chunk Size: ${MAX_CHUNK_SIZE}`);
+
+    const parallelUploads = [...new Array(concurrency).keys()];
+    core.debug("Awaiting all uploads");
     let offset = 0;
-    while (!streamIsClosed) {
-        const chunk: Buffer = stream.read(MAX_CHUNK_SIZE);
-        uploads.push(uploadChunk(restClient, cacheId, chunk, offset));
-        offset += MAX_CHUNK_SIZE;
-    }
+    await Promise.all(
+        parallelUploads.map(async () => {
+            while (offset < fileSize) {
+                const chunkSize =
+                    offset + MAX_CHUNK_SIZE > fileSize
+                        ? fileSize - offset
+                        : MAX_CHUNK_SIZE;
+                const start = offset;
+                const end = offset + chunkSize - 1;
+                offset += MAX_CHUNK_SIZE;
+                const chunk = fs.createReadStream(archivePath, {
+                    fd,
+                    start,
+                    end,
+                    autoClose: false
+                });
+                responses.push(
+                    await uploadChunk(
+                        restClient,
+                        resourceUrl,
+                        chunk,
+                        start,
+                        end
+                    )
+                );
+            }
+        })
+    );
 
-    const responses = await Promise.all(uploads);
+    fs.closeSync(fd);
 
     const failedResponse = responses.find(
         x => !isSuccessStatusCode(x.statusCode)
@@ -207,7 +218,34 @@ export async function saveCache(
         );
     }
 
+    return;
+}
+
+async function commitCache(
+    restClient: RestClient,
+    cacheId: number,
+    filesize: number
+): Promise<IRestResponse<void>> {
+    const requestOptions = getRequestOptions();
+    const commitCacheRequest: CommitCacheRequest = { size: filesize };
+    return await restClient.create(
+        `caches/${cacheId.toString()}`,
+        commitCacheRequest,
+        requestOptions
+    );
+}
+
+export async function saveCache(
+    cacheId: number,
+    archivePath: string
+): Promise<void> {
+    const restClient = createRestClient();
+
+    core.debug("Upload cache");
+    await uploadFile(restClient, cacheId, archivePath);
+
     // Commit Cache
+    core.debug("Commiting cache");
     const cacheSize = utils.getArchiveFileSize(archivePath);
     const commitCacheResponse = await commitCache(
         restClient,
diff --git a/src/save.ts b/src/save.ts
index 5b53eb4..9233f08 100644
--- a/src/save.ts
+++ b/src/save.ts
@@ -34,6 +34,15 @@ async function run(): Promise<void> {
             return;
         }
 
+        core.debug("Reserving Cache");
+        const cacheId = await cacheHttpClient.reserveCache(primaryKey);
+        if (cacheId < 0) {
+            core.info(
+                `Unable to reserve cache with key ${primaryKey}, another job may be creating this cache.`
+            );
+            return;
+        }
+        core.debug(`Cache ID: ${cacheId}`);
         const cachePath = utils.resolvePath(
             core.getInput(Inputs.Path, { required: true })
         );
@@ -59,7 +68,8 @@ async function run(): Promise<void> {
             return;
         }
 
-        await cacheHttpClient.saveCache(primaryKey, archivePath);
+        core.debug("Saving Cache");
+        await cacheHttpClient.saveCache(cacheId, archivePath);
     } catch (error) {
         utils.logWarning(error.message);
     }