"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.runHermesClientTest2 = exports.runHermesClientTest = exports.RpcRequestResponse = exports.newHermesClient = exports.ContentTypeHandler = void 0; const wsmessages_1 = require("./proto/wsmessages"); const Utils_1 = require("../Utils"); const JsonContentTypeHandler = { webSocketUrlPath: "/api/ws/send_receive_json", sendMessageFromClient(mfc, ws) { const obj = wsmessages_1.MessageFromClient.toJSON(mfc); const jsonStr = JSON.stringify(obj); ws.send(jsonStr); } }; const ProtobufContentTypeHandler = { webSocketUrlPath: "/api/ws/send_receive_proto", sendMessageFromClient(mfc, ws) { const bytes = wsmessages_1.MessageFromClient.encode(mfc).finish(); ws.send(bytes); } }; exports.ContentTypeHandler = { Json: JsonContentTypeHandler, Protobuf: ProtobufContentTypeHandler, }; function newHermesClient(rootUrl, contentTypeHandler) { const hci = new HermesClientImpl(rootUrl, contentTypeHandler); hci.mailbox().then((mbox) => { const correlations = hci.correlations; hci.channelMessageSubscribe({ id: "rpc-inbox", state: "rpc-inbox", readerKey: mbox.readerKey, channel: "rpc-inbox", startSeq: "all" }, (me, msg) => { var _a, _b, _c, _d, _e, _f; if (me.messageBytes) { try { const msg = wsmessages_1.Message.decode(me.messageBytes); const endPoint = (_b = (_a = msg.header) === null || _a === void 0 ? void 0 : _a.rpcHeader) === null || _b === void 0 ? void 0 : _b.endPoint; if (((_d = (_c = msg.header) === null || _c === void 0 ? void 0 : _c.rpcHeader) === null || _d === void 0 ? void 0 : _d.frameType) === wsmessages_1.RpcFrameType.Request && endPoint == "ping") { hci.sendPongResponse(mbox, msg, endPoint); } else { const correlationId = (_f = (_e = msg.header) === null || _e === void 0 ? void 0 : _e.rpcHeader) === null || _f === void 0 ? void 0 : _f.correlationId; if (correlationId) { const resolve = correlations.get(correlationId); if (resolve !== undefined) { resolve(msg); } correlations.delete(correlationId); } } } catch (e) { console.error("error decoding message", e); } } // noop since we are only interested in the correlationId for rpc and that happens in onMessage }); }); // send ping every 30 seconds setInterval(() => hci.sendPing(), 30 * 1000); return hci; } exports.newHermesClient = newHermesClient; /** * Create the mailbox * @param channels * @param rootUrl * @returns */ async function createMailbox(channels, rootUrl) { const mbox = { channels: channels, privateMetadata: {}, publicMetadata: {}, purgeTimeoutInMillis: 0, closeTimeoutInMillis: 0, extraData: {}, }; const mboxObj = wsmessages_1.CreateMailboxRequest.toJSON(mbox); const mboxJson = JSON.stringify(mboxObj); let mailboxResponse = undefined; const response = await fetch(`${rootUrl}/api/create_mailbox`, { method: "POST", headers: { "Content-Type": "application/json", }, body: mboxJson, }); if (response.ok) { const responseJsonStr = await response.text(); mailboxResponse = wsmessages_1.CreateMailboxResponse.fromJSON(JSON.parse(responseJsonStr)); } else { throw new Error(`createMailbox failed with status ${response.status}`); } return mailboxResponse; } class Constants { } Constants.rpcInboxChannelName = "rpc-inbox"; Constants.rpcSentChannelName = "rpc-sent"; class HermesConnection { constructor(clientImpl, mailbox, webSocket) { this.clientImpl = clientImpl; this.mailbox = mailbox; this.webSocket = webSocket; const self = this; webSocket.onmessage = function (event) { if (event.data instanceof ArrayBuffer) { self.onWebSocketBinaryMessage(event.data); } else { self.onWebSocketTextMessage(event.data); } }; webSocket.onclose = function (event) { console.log("HermesConnection websocket closed", event); clientImpl.reconnect(); }; // resend un ack'ed messages clientImpl.sentMessagesWaitingForAck.forEach((smr, idempotentId) => { self.sendSendMessageRequest(smr, false); }); } onWebSocketTextMessage(message) { const jsonObj = JSON.parse(message); const m2c = wsmessages_1.MessageToClient.fromJSON(jsonObj); this.onMessageToClient(m2c); } onWebSocketBinaryMessage(message) { const m2c = wsmessages_1.MessageToClient.decode(new Uint8Array(message)); this.onMessageToClient(m2c); } onMessageToClient(m2c) { var _a, _b, _c; if (m2c.notification !== undefined) { console.log("hermes client received notification " + m2c.notification, m2c.notification); } else if (m2c.messageEnvelope !== undefined) { const me = m2c.messageEnvelope; if (me.messageBytes === undefined) { console.log("hermes client received empty messageEnvelope", m2c.messageEnvelope); } else { const subscriptionId = (_a = me.serverEnvelope) === null || _a === void 0 ? void 0 : _a.subscriptionId; if (subscriptionId) { const activeSub = this.clientImpl.activeSubscriptions.get(subscriptionId); if (activeSub) { const startSeq = (_b = me.serverEnvelope) === null || _b === void 0 ? void 0 : _b.sequence; if (startSeq) { activeSub.protoRawSubscription.startSeq = String(startSeq); } activeSub.onMessageEvent(me); } } } } else if (m2c.sendMessageResponse !== undefined) { const id = (_c = m2c.sendMessageResponse) === null || _c === void 0 ? void 0 : _c.idempotentId; if (id) { this.clientImpl.sentMessagesWaitingForAck.delete(id); } console.log("hermes client received SendMessageResponse", m2c.sendMessageResponse); } else if (m2c.subscribeResponse !== undefined) { console.log("hermes client received subscribeResponse", m2c.subscribeResponse); } else if (m2c.ping !== undefined) { this.webSocket.send(JSON.stringify({ pong: {} })); } else if (m2c.pong !== undefined) { console.log("hermes client received pong"); } } sendMessageFromClient(mfc) { console.log("sending websocket message", mfc); this.clientImpl.contentTypeHandler.sendMessageFromClient(mfc, this.webSocket); } addActiveSubscription(activeSub) { const listeners = this.clientImpl.activeSubscriptions.get(activeSub.subscriptionId); if (listeners) { throw Error(`subscriptionId ${activeSub.subscriptionId} is already subscribed`); } else { this.clientImpl.activeSubscriptions.set(activeSub.subscriptionId, activeSub); } } cdcSubscribe(cdcs, listener) { const subscriptionId = "cdc-" + cdcs.tables.map((t) => t.database + "." + t.table).join("-"); const protoCdcs = { id: subscriptionId, matchers: cdcs.tables, startSeq: cdcs.startSeq, }; this.sendMessageFromClient({ subscribeRequest: { subscriptions: [{ changeDataCapture: protoCdcs }] } }); function onMessage(msg) { const json = new TextDecoder().decode(msg.messageBytes); const cdcEvent = JSON.parse(json); listener(cdcEvent, cdcEvent.data); } this.addActiveSubscription({ subscriptionId: subscriptionId, protoRawSubscription: protoCdcs, onMessageEvent: onMessage, protoSubscription: { changeDataCapture: protoCdcs } }); } channelMessageSubscribe(ms, listener) { this.rawChannelSubscribe(ms, wsmessages_1.Message.decode, listener); } channelSendReceiptSubscribe(ms, listener) { this.rawChannelSubscribe(ms, wsmessages_1.SendReceipt.decode, listener); } rawChannelSubscribe(ms, decoder, listener) { const subscriptionId = ms.id; if (!subscriptionId) { throw new Error("MailboxSubscription id is undefined"); } function onMessage(msg) { if (msg.messageBytes === undefined) { console.error("MessageEnvelope.messageBytes is undefined"); return; } const a = decoder(msg.messageBytes); listener(msg, a); } this.sendMessageFromClient({ subscribeRequest: { subscriptions: [{ mailbox: ms }] } }); this.addActiveSubscription({ subscriptionId: subscriptionId, onMessageEvent: onMessage, protoRawSubscription: ms, protoSubscription: { mailbox: ms } }); } sendPing() { this.sendMessageFromClient({ ping: {} }); } sendSendMessageRequest(smr, registerForAck) { if (registerForAck && smr.idempotentId) { this.clientImpl.sentMessagesWaitingForAck.set(smr.idempotentId, smr); } this.sendMessageFromClient({ sendMessageRequest: smr }); } rawRpcCall(request) { var _a; const emptyBytes = new Uint8Array(0); const correlationId = ((_a = this.mailbox.address) !== null && _a !== void 0 ? _a : "") + "-" + this.clientImpl.correlationIdCounter++; const idempotentId = this.mailbox.address + correlationId; const smr = { channel: Constants.rpcInboxChannelName, to: [request.to], idempotentId: idempotentId, message: { header: { rpcHeader: { correlationId: correlationId, endPoint: request.endPoint, frameType: wsmessages_1.RpcFrameType.Request, errorInfo: undefined, }, sender: this.mailbox.address, contentType: request.contentType, extraHeaders: request.headers, senderSequence: 0, }, serverEnvelope: undefined, senderEnvelope: { created: Date.now(), }, data: request.body !== undefined ? request.body : emptyBytes, }, }; const promise = new Promise((resolve, reject) => { this.clientImpl.correlations.set(correlationId, resolve); }); this.sendSendMessageRequest(smr, true); return promise; } rpcObserverSubscribe(readerKey, listener) { console.log("rpcObserverSubscribe", readerKey); const correlations = new Map(); const msInbox = { id: "rpc-inbox-" + readerKey, readerKey: readerKey, channel: "rpc-inbox", startSeq: "first", }; this.channelMessageSubscribe(msInbox, (me, msg) => { var _a, _b; const correlationId = (_b = (_a = msg.header) === null || _a === void 0 ? void 0 : _a.rpcHeader) === null || _b === void 0 ? void 0 : _b.correlationId; if (correlationId) { var correlation = correlations.get(correlationId); if (!correlation) { correlation = new RpcRequestResponse(correlationId); correlations.set(correlationId, correlation); } correlation.inboxEnvelope = me; correlation.inboxMessage = msg; listener(correlation); } }); const msSent = { id: "rpc-sent-" + readerKey, readerKey: readerKey, channel: "rpc-sent", startSeq: "first", }; this.channelSendReceiptSubscribe(msSent, (me, sr) => { var _a, _b, _c, _d, _e; const msg = (_a = sr.request) === null || _a === void 0 ? void 0 : _a.message; const correlationId = (_e = (_d = (_c = (_b = sr.request) === null || _b === void 0 ? void 0 : _b.message) === null || _c === void 0 ? void 0 : _c.header) === null || _d === void 0 ? void 0 : _d.rpcHeader) === null || _e === void 0 ? void 0 : _e.correlationId; if (correlationId !== undefined) { var correlation = correlations.get(correlationId); if (correlation === undefined) { correlation = new RpcRequestResponse(correlationId); correlations.set(correlationId, correlation); } correlation.sentMessage = msg; correlation.sendReceiptEnvelope = me; correlation.sendReceipt = sr; listener(correlation); } }); } } class HermesClientImpl { constructor(rootUrl, contentTypeHandler) { this.correlationIdCounter = 0; this.correlations = new Map(); this.activeSubscriptions = new Map(); this.sentMessagesWaitingForAck = new Map(); const thisHermesClientImpl = this; this.rootUrl = rootUrl; this.contentTypeHandler = contentTypeHandler; this.mailboxResponseP = createMailbox([Constants.rpcInboxChannelName, Constants.rpcSentChannelName], rootUrl); var tempMailboxResponseP = this.mailboxResponseP; var tempWsUrl = new URL(rootUrl); tempWsUrl.protocol = tempWsUrl.protocol.replace("http", "ws"); tempWsUrl.pathname = contentTypeHandler.webSocketUrlPath; this.wsUrl = tempWsUrl.toString(); this.currentConn = this.newHermesConnection(); } sendPongResponse(mbox, pingMsg, endPoint) { var _a, _b, _c, _d, _e, _f; const correlationId = (_b = (_a = pingMsg.header) === null || _a === void 0 ? void 0 : _a.rpcHeader) === null || _b === void 0 ? void 0 : _b.correlationId; const sender = (_c = pingMsg === null || pingMsg === void 0 ? void 0 : pingMsg.header) === null || _c === void 0 ? void 0 : _c.sender; const contentType = (_e = (_d = pingMsg === null || pingMsg === void 0 ? void 0 : pingMsg.header) === null || _d === void 0 ? void 0 : _d.contentType) !== null && _e !== void 0 ? _e : wsmessages_1.ContentType.UnspecifiedCT; if (correlationId !== undefined && sender !== undefined) { var ping = {}; if (pingMsg.data !== undefined) { if (contentType == wsmessages_1.ContentType.Json) { ping = wsmessages_1.Ping.fromJSON(pingMsg.data); } else { ping = wsmessages_1.Ping.decode(pingMsg.data); } } const pong = { payload: ping.payload }; var data; if (contentType == wsmessages_1.ContentType.Json) { data = new TextEncoder().encode(JSON.stringify(wsmessages_1.Pong.toJSON(pong))); } else { data = wsmessages_1.Pong.encode(pong).finish(); } const idempotentId = mbox.address + correlationId; const smr = { channel: Constants.rpcInboxChannelName, to: [sender], idempotentId: idempotentId, message: { header: { rpcHeader: { correlationId: correlationId, endPoint: endPoint, frameType: wsmessages_1.RpcFrameType.SuccessResponse, errorInfo: undefined, }, sender: mbox.address, contentType: (_f = pingMsg === null || pingMsg === void 0 ? void 0 : pingMsg.header) === null || _f === void 0 ? void 0 : _f.contentType, }, serverEnvelope: undefined, senderEnvelope: { created: Date.now(), }, data: data, }, }; this.withConn((conn) => { conn.sendSendMessageRequest(smr, true); }); } else { console.log("ignoring ping no correlation id", pingMsg); } } reconnect() { this.currentConn = this.newHermesConnection(); } newHermesConnection() { const outerThis = this; return new Promise((resolve, reject) => { this.mailboxResponseP.then((mbox) => { var webSocket = new WebSocket(this.wsUrl); webSocket.binaryType = "arraybuffer"; webSocket.onopen = function (event) { console.log("hermes client websocket opened, sending first message"); const resubscriptions = Object.values(outerThis.activeSubscriptions).map((as) => { return as.protoSubscription; }); // send first message const firstMessage = { senderInfo: { readerKey: mbox.readerKey, address: mbox.address, }, subscriptions: resubscriptions, mailboxTimeoutInMs: 2 * 60 * 1000, // 2 minutes }; const mfc = { firstMessage: firstMessage, }; console.log("sending first message"); outerThis.contentTypeHandler.sendMessageFromClient(mfc, webSocket); console.log("resolving promise"); resolve(new HermesConnection(outerThis, mbox, webSocket)); }; }); }); } mailbox() { return this.mailboxResponseP; } async withConn(fn) { return this.currentConn.then((conn) => fn(conn)); } async withConnP(fn) { return this.currentConn.then((conn) => fn(conn)); } rawRpcCall(request) { return this.withConnP((conn) => { return conn.rawRpcCall(request); }); } cdcSubscribe(cdcs, listener) { this.withConn((conn) => { conn.cdcSubscribe(cdcs, listener); }); } rpcObserverSubscribe(readerKey, listener) { console.log("outer rpcObserverSubscribe", readerKey); this.withConn((conn) => { console.log("inner rpcObserverSubscribe", readerKey); conn.rpcObserverSubscribe(readerKey, listener); }); } channelMessageSubscribe(ms, listener) { this.withConn((conn) => { conn.channelMessageSubscribe(ms, listener); }); } channelSendReceiptSubscribe(ms, listener) { this.withConn((conn) => { conn.channelSendReceiptSubscribe(ms, listener); }); } sendPing() { this.withConn((conn) => { conn.sendPing(); }); } } class RpcRequestResponse { constructor(correlationId) { this.correlationId = correlationId; } role() { const ic = this.isClient(); if (ic) { return "client"; } else if (ic === false) { return "server"; } } contentType() { var _a, _b, _c, _d, _e, _f; const contentType = (_f = (_c = (_b = (_a = this.requestMessage()) === null || _a === void 0 ? void 0 : _a.header) === null || _b === void 0 ? void 0 : _b.contentType) !== null && _c !== void 0 ? _c : (_e = (_d = this.responseMessage()) === null || _d === void 0 ? void 0 : _d.header) === null || _e === void 0 ? void 0 : _e.contentType) !== null && _f !== void 0 ? _f : wsmessages_1.ContentType.UnspecifiedCT; return contentType; } isProtobuf() { return this.contentType() === wsmessages_1.ContentType.Protobuf; } isJson() { return this.contentType() === wsmessages_1.ContentType.Json; } isClient() { var _a, _b, _c, _d, _e, _f; const inboxFrameType = (_c = (_b = (_a = this.inboxMessage) === null || _a === void 0 ? void 0 : _a.header) === null || _b === void 0 ? void 0 : _b.rpcHeader) === null || _c === void 0 ? void 0 : _c.frameType; const sentFrameType = (_f = (_e = (_d = this.sentMessage) === null || _d === void 0 ? void 0 : _d.header) === null || _e === void 0 ? void 0 : _e.rpcHeader) === null || _f === void 0 ? void 0 : _f.frameType; if (sentFrameType === wsmessages_1.RpcFrameType.Request) { return true; } else if (inboxFrameType === wsmessages_1.RpcFrameType.Request) { return false; } } hasRequestAndResponse() { return this.sendReceiptEnvelope && this.inboxEnvelope ? true : false; } timeStarted() { var _a, _b, _c, _d; const ic = this.isClient(); var time = (0, Utils_1.undef)(); if (ic === true) { time = (_b = (_a = this.sendReceiptEnvelope) === null || _a === void 0 ? void 0 : _a.serverEnvelope) === null || _b === void 0 ? void 0 : _b.created; } else if (ic === false) { time = (_d = (_c = this.inboxEnvelope) === null || _c === void 0 ? void 0 : _c.serverEnvelope) === null || _d === void 0 ? void 0 : _d.created; } if (time) { return new Date(time); } } timeStartedL() { var _a, _b, _c, _d; const ic = this.isClient(); var time = (0, Utils_1.undef)(); if (ic === true) { time = (_b = (_a = this.sendReceiptEnvelope) === null || _a === void 0 ? void 0 : _a.serverEnvelope) === null || _b === void 0 ? void 0 : _b.created; } else if (ic === false) { time = (_d = (_c = this.inboxEnvelope) === null || _c === void 0 ? void 0 : _c.serverEnvelope) === null || _d === void 0 ? void 0 : _d.created; } return time; } timeCompleted() { var _a, _b, _c, _d; const ic = this.isClient(); var time = undefined; if (ic === false) { time = (_b = (_a = this.sendReceiptEnvelope) === null || _a === void 0 ? void 0 : _a.serverEnvelope) === null || _b === void 0 ? void 0 : _b.created; } else if (ic === true) { time = (_d = (_c = this.inboxEnvelope) === null || _c === void 0 ? void 0 : _c.serverEnvelope) === null || _d === void 0 ? void 0 : _d.created; } if (time) { return new Date(time); } } durationInMillis() { var _a, _b; const ts = (_a = this.timeStarted()) === null || _a === void 0 ? void 0 : _a.getTime(); const tc = (_b = this.timeCompleted()) === null || _b === void 0 ? void 0 : _b.getTime(); if (ts && tc) { return tc - ts; } } endPoint() { var _a, _b, _c; return (_c = (_b = (_a = this.requestMessage()) === null || _a === void 0 ? void 0 : _a.header) === null || _b === void 0 ? void 0 : _b.rpcHeader) === null || _c === void 0 ? void 0 : _c.endPoint; } requestMessage() { const ic = this.isClient(); if (ic === true) { return this.sentMessage; } else if (ic === false) { return this.inboxMessage; } } requestEnvelope() { const ic = this.isClient(); if (ic === true) { return this.sendReceiptEnvelope; } else if (ic === false) { return this.inboxEnvelope; } } responseMessage() { const ic = this.isClient(); if (ic === true) { return this.inboxMessage; } else if (ic === false) { return this.sentMessage; } } responseEnvelope() { const ic = this.isClient(); if (ic === true) { return this.inboxEnvelope; } else if (ic === false) { return this.sendReceiptEnvelope; } } status() { var _a, _b, _c; const frameType = (_c = (_b = (_a = this.responseMessage()) === null || _a === void 0 ? void 0 : _a.header) === null || _b === void 0 ? void 0 : _b.rpcHeader) === null || _c === void 0 ? void 0 : _c.frameType; if (!frameType) { return ""; } else if (frameType === wsmessages_1.RpcFrameType.ErrorResponse) { return "error"; } else if (frameType === wsmessages_1.RpcFrameType.SuccessResponse) { return "success"; } else { return `Unexpected frame types ${frameType}`; } } async processSchema(reqOrResp, data) { if (this.isJson()) { const jsonStr = new TextDecoder().decode(data); return JSON.parse(jsonStr); } else { const endPoint = this.endPoint(); if (endPoint === undefined) { return { "error": "no endpoint" }; } if (data === undefined) { return {}; } return protobufToJson(endPoint, reqOrResp, data); } } async responseObj() { var _a; return this.processSchema("response", (_a = this.responseMessage()) === null || _a === void 0 ? void 0 : _a.data); } async requestObj() { var _a; return this.processSchema("request", (_a = this.requestMessage()) === null || _a === void 0 ? void 0 : _a.data); } } exports.RpcRequestResponse = RpcRequestResponse; const GlobalClient = { get: (0, Utils_1.memoize)(() => newHermesClient("https://hermes-go.ahsrcm.com", JsonContentTypeHandler)) }; exports.default = GlobalClient; function runHermesClientTest() { } exports.runHermesClientTest = runHermesClientTest; function runHermesClientTest2() { // const hc = newHermesClient("https://hermes-go.ahsrcm.com", ContentType.Protobuf); const hc = newHermesClient("https://hermes-go.ahsrcm.com", JsonContentTypeHandler); hc.mailbox().then((mbox) => { const cdcs = { tables: [ { database: "nefario", table: "service", }, ], startSeq: "new", }; hc.cdcSubscribe(cdcs, (cdcEvent, a) => { console.log("cdcEvent", cdcEvent); }); }); // hc.correlatedRpcReader("rrb07167144dc644a0be22a85301afea7e" , (correlation) => { // console.log("correlation", correlation); // }); } exports.runHermesClientTest2 = runHermesClientTest2; async function protobufToJson(schemaName, frametype, bytes) { // const mboxObj = CreateMailboxRequest.toJSON(mbox); // const mboxJson = JSON.stringify(mboxObj); // let mailboxResponse: CreateMailboxResponse | undefined = undefined; const rootUrl = GlobalClient.get().rootUrl; const response = await fetch(`${rootUrl}/api/proto_to_json?schema=${schemaName}&frametype=${frametype}`, { method: "POST", body: bytes, }); if (response.ok) { const jsonStr = await response.text(); return JSON.parse(jsonStr); } else { throw new Error(`proto_to_json failed with status ${response.status}`); } }