Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 46 additions & 74 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import thrift from 'thrift';
import Int64 from 'node-int64';

import { EventEmitter } from 'events';
import TCLIService from '../thrift/TCLIService';
import { TProtocolVersion } from '../thrift/TCLIService_types';
import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient';
import IDriver from './contracts/IDriver';
import IClientContext, { ClientConfig } from './contracts/IClientContext';
Expand All @@ -14,9 +12,11 @@ import IDBSQLSession from './contracts/IDBSQLSession';
import IAuthentication from './connection/contracts/IAuthentication';
import HttpConnection from './connection/connections/HttpConnection';
import IConnectionOptions from './connection/contracts/IConnectionOptions';
import Status from './dto/Status';
import HiveDriverError from './errors/HiveDriverError';
import { buildUserAgentString, definedOrError, serializeQueryTags } from './utils';
import { buildUserAgentString } from './utils';
import IBackend from './contracts/IBackend';
import ThriftBackend from './thrift-backend/ThriftBackend';
import SeaBackend from './sea/SeaBackend';
import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication';
import DatabricksOAuth, { OAuthFlow } from './connection/auth/DatabricksOAuth';
import {
Expand All @@ -39,19 +39,6 @@ function prependSlash(str: string): string {
return str;
}

function getInitialNamespaceOptions(catalogName?: string, schemaName?: string) {
if (!catalogName && !schemaName) {
return {};
}

return {
initialNamespace: {
catalogName,
schemaName,
},
};
}

export type ThriftLibrary = Pick<typeof thrift, 'createClient'>;

export default class DBSQLClient extends EventEmitter implements IDBSQLClient, IClientContext {
Expand All @@ -75,6 +62,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I

private readonly sessions = new CloseableCollection<DBSQLSession>();

private backend?: IBackend;

private static getDefaultLogger(): IDBSQLLogger {
if (!this.defaultLogger) {
this.defaultLogger = new DBSQLLogger();
Expand Down Expand Up @@ -248,38 +237,45 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I

this.connectionProvider = this.createConnectionProvider(options);

const thriftConnection = await this.connectionProvider.getThriftConnection();

thriftConnection.on('error', (error: Error) => {
// Error.stack already contains error type and message, so log stack if available,
// otherwise fall back to just error type + message
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
try {
this.emit('error', error);
} catch (e) {
// EventEmitter will throw unhandled error when emitting 'error' event.
// Since we already logged it few lines above, just suppress this behaviour
}
});

thriftConnection.on('reconnecting', (params: { delay: number; attempt: number }) => {
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(params)}`);
this.emit('reconnecting', params);
});

thriftConnection.on('close', () => {
this.logger.log(LogLevel.debug, 'Closing connection.');
this.emit('close');
});
this.backend = options.useSEA
? new SeaBackend({ context: this })
: new ThriftBackend({
context: this,
onConnectionEvent: (event, payload) => this.forwardConnectionEvent(event, payload),
});

thriftConnection.on('timeout', () => {
this.logger.log(LogLevel.debug, 'Connection timed out.');
this.emit('timeout');
});
await this.backend.connect(options);

return this;
}

private forwardConnectionEvent(event: 'error' | 'reconnecting' | 'close' | 'timeout', payload?: unknown): void {
switch (event) {
case 'error': {
const error = payload as Error;
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
try {
this.emit('error', error);
} catch (e) {
// EventEmitter throws when 'error' has no listeners; we've already logged it.
}
return;
}
case 'reconnecting':
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(payload)}`);
this.emit('reconnecting', payload);
return;
case 'close':
this.logger.log(LogLevel.debug, 'Closing connection.');
this.emit('close');
return;
case 'timeout':
this.logger.log(LogLevel.debug, 'Connection timed out.');
this.emit('timeout');
// no default
}
}

/**
* Starts new session
* @public
Expand All @@ -290,44 +286,20 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
* const session = await client.openSession();
*/
public async openSession(request: OpenSessionRequest = {}): Promise<IDBSQLSession> {
// Prepare session configuration
const configuration = request.configuration ? { ...request.configuration } : {};

// Add metric view metadata config if enabled
if (this.config.enableMetricViewMetadata) {
configuration['spark.sql.thriftserver.metadata.metricview.enabled'] = 'true';
}

// Serialize queryTags dict and set in configuration; takes precedence over configuration.QUERY_TAGS
if (request.queryTags !== undefined) {
const serialized = serializeQueryTags(request.queryTags);
if (serialized) {
configuration.QUERY_TAGS = serialized;
} else {
delete configuration.QUERY_TAGS;
}
if (!this.backend) {
throw new HiveDriverError('DBSQLClient: not connected');
}

const response = await this.driver.openSession({
client_protocol_i64: new Int64(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8),
...getInitialNamespaceOptions(request.initialCatalog, request.initialSchema),
configuration,
canUseMultipleCatalogs: true,
});

Status.assert(response.status);
const session = new DBSQLSession({
handle: definedOrError(response.sessionHandle),
context: this,
serverProtocolVersion: response.serverProtocolVersion,
});
const sessionBackend = await this.backend.openSession(request);
const session = new DBSQLSession({ backend: sessionBackend, context: this });
this.sessions.add(session);
return session;
}

public async close(): Promise<void> {
await this.sessions.closeAll();
await this.backend?.close();

this.backend = undefined;
this.client = undefined;
this.connectionProvider = undefined;
this.authProvider = undefined;
Expand Down
Loading
Loading