Created
July 30, 2020 17:56
-
-
Save tksilicon/f646c5689fc179a64c1d8859e3aeab08 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
Connection, ConnectionOptions, createParams, | |
QueryPostgres, QueryConfig, QueryResult, PostgresError, log | |
} from "./postgres_deps.ts"; | |
/** Transaction processor */ | |
export interface TransactionProcessor<T> { | |
(connection: Connection): Promise<T>; | |
} | |
export class PostgresClient { | |
protected _connection: Connection; | |
constructor(config?: ConnectionOptions | string) { | |
const connectionParams = createParams(config); | |
this._connection = new Connection(connectionParams); | |
} | |
async connect(): Promise<void> { | |
await this._connection.startup(); | |
await this._connection.initSQL(); | |
} | |
// TODO: can we use more specific type for args? | |
async query( | |
text: string | QueryConfig, | |
// deno-lint-ignore no-explicit-any | |
...args: any[] | |
): Promise<QueryResult> { | |
const query = new QueryPostgres(text, ...args); | |
return await this._connection.query(query); | |
} | |
async multiQuery(queries: QueryConfig[]): Promise<QueryResult[]> { | |
const result: QueryResult[] = []; | |
for (const query of queries) { | |
result.push(await this.query(query)); | |
} | |
return result; | |
} | |
async end(): Promise<void> { | |
await this._connection.end(); | |
} | |
// Support `using` module | |
_aenter = this.connect; | |
_aexit = this.end; | |
/** | |
* Use a connection/meant for transaction processor | |
* | |
* @param fn transation processor | |
*/ | |
async useConnection<T>(fn: (conn: Connection) => Promise<T>) { | |
if (!this._connection) { | |
throw new Error("Unconnected"); | |
} | |
try { | |
const result = await fn(this._connection); | |
return result; | |
} catch (error) { | |
throw new PostgresError({severity: "high", code: 'T', message: error.message}); | |
} | |
} | |
/** | |
* Execute a transaction process, and the transaction successfully | |
* returns the return value of the transaction process | |
* @param processor transation processor | |
*/ | |
async transaction<T = any>(processor: TransactionProcessor<T>): Promise<T> { | |
return await this.useConnection(async (connection) => { | |
try { | |
await connection.query(new QueryPostgres("BEGIN")); | |
const result = await processor(connection); | |
await connection.query(new QueryPostgres("COMMIT")); | |
return result; | |
} catch (error) { | |
log.info(`ROLLBACK: ${error.message}`); | |
await connection.query(new QueryPostgres("ROLLBACK")); | |
throw error; | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment