Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL Incompatibility. Pipeline operaitons on a MYSQL2 Query Stream hang when the connection is lost. #3082

Open
markddrake opened this issue Sep 29, 2024 · 1 comment

Comments

@markddrake
Copy link

markddrake commented Sep 29, 2024

Please note that this is a case where the behavior of MySQL2 diverges significantly from MySQL.

Testcase for MySQL

import {pipeline} from 'stream/promises'
import {Transform} from 'stream'

import mysql from 'mysql'
import fs from 'fs'


class MyTransform extends Transform {

  constructor() {
	super({objectMode: true })
	this.counter = 0
  }
    
  _transform(data,enc,callback) {
    this.push(JSON.stringify(data))
	callback()
  }
 
}

class Test { 

	vendorProperties = {
	  "multipleStatements":true
	 ,"typeCast":true
	 ,"supportBigNumbers":true
	 ,"bigNumberStrings":true
	 ,"dateStrings":true 
	 ,"trace":true
	 ,"user":"root"
	 ,"password": "oracle"
	 ,"host":"yadamu-db2"
	 ,"database":"mysql"
	 ,"port":33061
	 , infileStreamFactory : (path) => {return fs.createReadStream(path)}
	 }  
	 
  async createConnectionPool() {
    
    let stack, operation
	
    try {
      stack = new Error().stack;
      operation = 'mysql.createPool()'  
      this.pool = mysql.createPool(this.vendorProperties)
      console.log('Pool Created')
	} catch (e) {
      throw e
    }
    
	
  }

  async getConnectionFromPool() {

    const connection = await new Promise((resolve,reject) => {
      this.pool.getConnection((err,connection) => {
        if (err) {
          reject(this.getDatabaseException(this.DRIVER_ID,err,stack,'mysql.Pool.getConnection()'))
        }
        resolve(connection)
      })
    })
	
	return connection
  }
  
  async closeConnection(options) {


    if ((this.connection !== undefined) && (typeof this.connection.release === 'function')) {
      let stack;
      try {
        stack = new Error().stack
        await this.connection.release()
        this.connection = undefined;
      } catch (e) {
        this.connection = undefined;
        throw e
      }
    }
  };
      
  async closePool(options) {
      
      
    if ((this.pool !== undefined) && (typeof this.pool.end === 'function')) {
      let stack;
      try {
        stack = new Error().stack
        await this.pool.end()
        this.pool = undefined;
      } catch (e) {
        this.pool = undefined;
        throw e
      }
    }
  }
  executeSQL(sqlStatement,args) {
    
    return new Promise((resolve,reject) => {
  
      const stack = new Error().stack;
      const sqlStartTime = performance.now() 
      this.connection.query(sqlStatement,args,async (err,results,fields) => {
		console.log(results)
        resolve(results)
      })
    })
  }  
  
  async getConnectionID() {
    const results = await this.executeSQL(`select connection_id() "pid"`)
    const pid = results[0].pid
    return pid
  }
  
  async test() {
	  let results
	  try {
        await this.createConnectionPool()
	    this.connection = await this.getConnectionFromPool()
		const pid = await this.getConnectionID()
		
		const connection2 = await this.getConnectionFromPool()
		
		results = await this.executeSQL(`SET AUTOCOMMIT = 0, TIME_ZONE = '+00:00',SESSION INTERACTIVE_TIMEOUT = 600000, WAIT_TIMEOUT = 600000, SQL_MODE='ANSI_QUOTES,PAD_CHAR_TO_FULL_LENGTH', GROUP_CONCAT_MAX_LEN = 1024000, GLOBAL LOCAL_INFILE = 'ON'`);

		results = await this.executeSQL(`select count(*) from "WWI_Warehouse"."ColdRoomTemperatures_Archive"`);
		console.log(results)

        const is = this.connection.query({sql: `select * from "WWI_Warehouse"."ColdRoomTemperatures_Archive"`, rowsAsArray: true}).stream()
		// const is = fs.createReadStream('input.txt')
        is.on('error',(err) => {
	      console.log(is.constructor.name,err)
	      err.pipelineComponents = [...err.pipelineComponents || [],is.constructor.name]
        }).on
        const t = new MyTransform()
        t.on('error',(err) => {
	      console.log(t.constructor.name,err)
	      err.pipelineComponents = [...err.pipelineComponents || [],t.constructor.name]
        })
		
        const os =  fs.createWriteStream('output.txt')
        os.on('error',(err) => {
	      console.log(os.constructor.name,err)
          err.pipelineComponents = [...err.pipelineComponents || [],os.constructor.name]
        })
  
  
        const streams = [is,t,os]
        console.log(streams.map((s) => { return s.constructor.name }).join(' => '))
        try {
	      console.log('Start Pipeline')
   	      setTimeout(async () => {
			 console.log('Kill',pid)
	         const operation = `kill ${pid}`
		     const res = await connection2.query(operation);
             connection2.release() 
	      },5000)
          await pipeline(...streams);
          console.log('End Pipeline')
        } catch (err) {
          console.error('Pipeline error:', err);
        }    
		console.log('Done')
		
 	    await this.closeConnection();
		await this.closePool();
	  } catch (e) {
		 console.log(e)
 	    await this.closeConnection();
		await this.closePool();
		console.log(e)
	  }
  }
}


process.on('unhandledRejection', (e,p) => {
  console.log("Unhandled",e,p)
})

const test = new Test();
test.test().then(() => {
 console.log('Success')
}).catch((e) => {
  console.log(e)
})

When run generates the following expected output.

C:\Development\YADAMU\src\scratch\mysql>node lostConnectionMySQL.js
Pool Created
[ RowDataPacket { pid: '35' } ]
OkPacket {
  fieldCount: 0,
  affectedRows: 0,
  insertId: 0,
  serverStatus: 0,
  warningCount: 1,
  message: '',
  protocol41: true,
  changedRows: 0
}
[ RowDataPacket { 'count(*)': '3654736' } ]
[ RowDataPacket { 'count(*)': '3654736' } ]
Readable => MyTransform => WriteStream
Start Pipeline
Kill 35
Readable Error: Connection lost: The server closed the connection.
    at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
    at Socket.emit (node:events:532:35)
    at endReadableNT (node:internal/streams/readable:1696:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
    --------------------
    at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
    at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
    at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  fatal: true,
  code: 'PROTOCOL_CONNECTION_LOST'
}
Readable Error: Connection lost: The server closed the connection.
    at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
    at Socket.emit (node:events:532:35)
    at endReadableNT (node:internal/streams/readable:1696:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
    --------------------
    at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
    at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
    at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  fatal: true,
  code: 'PROTOCOL_CONNECTION_LOST',
  pipelineComponents: [ 'Readable' ]
}
MyTransform Error: Connection lost: The server closed the connection.
    at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
    at Socket.emit (node:events:532:35)
    at endReadableNT (node:internal/streams/readable:1696:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
    --------------------
    at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
    at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
    at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  fatal: true,
  code: 'PROTOCOL_CONNECTION_LOST',
  pipelineComponents: [ 'Readable', 'Readable' ]
}
Pipeline error: Error: Connection lost: The server closed the connection.
    at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
    at Socket.emit (node:events:532:35)
    at endReadableNT (node:internal/streams/readable:1696:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
    --------------------
    at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
    at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
    at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  fatal: true,
  code: 'PROTOCOL_CONNECTION_LOST',
  pipelineComponents: [ 'Readable', 'Readable', 'MyTransform' ]
}
Done
Success
WriteStream Error: Connection lost: The server closed the connection.
    at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
    at Socket.emit (node:events:532:35)
    at endReadableNT (node:internal/streams/readable:1696:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
    --------------------
    at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
    at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
    at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  fatal: true,
  code: 'PROTOCOL_CONNECTION_LOST',
  pipelineComponents: [ 'Readable', 'Readable', 'MyTransform' ]
}

C:\Development\YADAMU\src\scratch\mysql>
C:\Development\YADAMU\src\scratch\mysql>
C:\Development\YADAMU\src\scratch\mysql>node lostConnectionMySQL.js
Pool Created
[ RowDataPacket { pid: '37' } ]
OkPacket {
  fieldCount: 0,
  affectedRows: 0,
  insertId: 0,
  serverStatus: 0,
  warningCount: 1,
  message: '',
  protocol41: true,
  changedRows: 0
}
[ RowDataPacket { 'count(*)': '3654736' } ]
[ RowDataPacket { 'count(*)': '3654736' } ]
Readable => MyTransform => WriteStream
Start Pipeline
Kill 37
Readable Error: Connection lost: The server closed the connection.
    at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
    at Socket.emit (node:events:532:35)
    at endReadableNT (node:internal/streams/readable:1696:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
    --------------------
    at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
    at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
    at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  fatal: true,
  code: 'PROTOCOL_CONNECTION_LOST'
}
Readable Error: Connection lost: The server closed the connection.
    at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
    at Socket.emit (node:events:532:35)
    at endReadableNT (node:internal/streams/readable:1696:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
    --------------------
    at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
    at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
    at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  fatal: true,
  code: 'PROTOCOL_CONNECTION_LOST',
  pipelineComponents: [ 'Readable' ]
}
MyTransform Error: Connection lost: The server closed the connection.
    at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
    at Socket.emit (node:events:532:35)
    at endReadableNT (node:internal/streams/readable:1696:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
    --------------------
    at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
    at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
    at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  fatal: true,
  code: 'PROTOCOL_CONNECTION_LOST',
  pipelineComponents: [ 'Readable', 'Readable' ]
}
Pipeline error: Error: Connection lost: The server closed the connection.
    at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
    at Socket.emit (node:events:532:35)
    at endReadableNT (node:internal/streams/readable:1696:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
    --------------------
    at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
    at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
    at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  fatal: true,
  code: 'PROTOCOL_CONNECTION_LOST',
  pipelineComponents: [ 'Readable', 'Readable', 'MyTransform' ]
}
Done
Success
WriteStream Error: Connection lost: The server closed the connection.
    at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
    at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
    at Socket.emit (node:events:532:35)
    at endReadableNT (node:internal/streams/readable:1696:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
    --------------------
    at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
    at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
    at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  fatal: true,
  code: 'PROTOCOL_CONNECTION_LOST',
  pipelineComponents: [ 'Readable', 'Readable', 'MyTransform' ]
}

When connection 1 is killed the pipeline terminates with a "The Server Closed the connection'. This allows the code to handle the lost connection in a real world enviroment.

When the MySQL2 variant of the code is run

import {pipeline} from 'stream/promises'
import {Transform} from 'stream'

import mysql from 'mysql2/promise'
import fs from 'fs'


class MyTransform extends Transform {

  constructor() {
	super({objectMode: true })
	this.counter = 0
  }
    
  _transform(data,enc,callback) {
    this.push(JSON.stringify(data))
	callback()
  }
 
}

class Test { 

	vendorProperties = {
	  "multipleStatements":true
	 ,"typeCast":true
	 ,"supportBigNumbers":true
	 ,"bigNumberStrings":true
	 ,"dateStrings":true	 
	 ,"trace":true
	 ,"user":"root"
	 ,"password": "oracle"
	 ,"host":"yadamu-db2"
	 ,"database":"mysql"
	 ,"port":33061
	 , infileStreamFactory : (path) => {return fs.createReadStream(path)}
	 }  
	 
  async createConnectionPool() {
    
    let stack, operation
	
    try {
      stack = new Error().stack;
      operation = 'mysql.createPool()'  
      this.pool = mysql.createPool(this.vendorProperties)
      console.log('Pool Created')
	} catch (e) {
      throw e
    }
    
	
  }

  async getConnectionFromPool() {

    let stack
    
    try {    
      stack = new Error().stack;
      const connection = await this.pool.getConnection()
	  console.log('Connection obtained')
      return connection
	} catch (err) {
	  throw err 
    }
  }
  
  async closeConnection(options) {


    if ((this.connection !== undefined) && (typeof this.connection.release === 'function')) {
      let stack;
      try {
        stack = new Error().stack
        await this.connection.release()
        this.connection = undefined;
      } catch (e) {
        this.connection = undefined;
        throw e
      }
    }
  };
      
  async closePool(options) {
      
      
    if ((this.pool !== undefined) && (typeof this.pool.end === 'function')) {
      let stack;
      try {
        stack = new Error().stack
        await this.pool.end()
        this.pool = undefined;
      } catch (e) {
        this.pool = undefined;
        throw e
      }
    }
  }
 
  async executeSQL(sqlStatement,args) {
    

    let stack
	let results

      try {
        stack = new Error().stack;
		const [results,fields] = await this.connection.query(sqlStatement,args)
		return results;
      } catch (e) {
        throw e
      }
  }
  
  async getConnectionID() {
    const results = await this.executeSQL(`select connection_id() "pid"`)
    const pid = results[0].pid
    return pid
  }
  
  async test() {
	  let results
	  try {
        await this.createConnectionPool()
	    this.connection = await this.getConnectionFromPool()
		const pid = await this.getConnectionID()
		
		const connection2 = await this.getConnectionFromPool()
		
		results = await this.executeSQL(`SET AUTOCOMMIT = 0, TIME_ZONE = '+00:00',SESSION INTERACTIVE_TIMEOUT = 600000, WAIT_TIMEOUT = 600000, SQL_MODE='ANSI_QUOTES,PAD_CHAR_TO_FULL_LENGTH', GROUP_CONCAT_MAX_LEN = 1024000, GLOBAL LOCAL_INFILE = 'ON'`);

		results = await this.executeSQL(`select count(*) from "WWI_Warehouse"."ColdRoomTemperatures_Archive"`);
		console.log(results)

        const is = this.connection.connection.query({sql: `select * from "WWI_Warehouse"."ColdRoomTemperatures_Archive"`, rowsAsArray: true}).stream()
		// const is = fs.createReadStream('input.txt')
        is.on('error',(err) => {
	      console.log(is.constructor.name,err)
	      err.pipelineComponents = [...err.pipelineComponents || [],is.constructor.name]
        }).on
        const t = new MyTransform()
        t.on('error',(err) => {
	      console.log(t.constructor.name,err)
	      err.pipelineComponents = [...err.pipelineComponents || [],t.constructor.name]
        })
		
        const os =  fs.createWriteStream('output.txt')
        os.on('error',(err) => {
	      console.log(os.constructor.name,err)
          err.pipelineComponents = [...err.pipelineComponents || [],os.constructor.name]
        })
  
  
        const streams = [is,t,os]
        console.log(streams.map((s) => { return s.constructor.name }).join(' => '))
        try {
	      console.log('Start Pipeline')
   	      setTimeout(async () => {
			 console.log('Kill',pid)
	         const operation = `kill ${pid}`
		     const res = await connection2.query(operation);
             connection2.release() 
	      },5000)
          await pipeline(...streams);
          console.log('End Pipeline')
        } catch (err) {
          console.error('Pipeline error:', err);
        }    
		console.log('Done')
		
 	    await this.closeConnection();
		await this.closePool();
	  } catch (e) {
		 console.log(e)
 	    await this.closeConnection();
		await this.closePool();
		console.log(e)
	  }
  }
}


process.on('unhandledRejection', (e,p) => {
  console.log("Unhandled",e,p)
})

const test = new Test();
test.test().then(() => {
 console.log('Success')
}).catch((e) => {
  console.log(e)
})

The process hangs

C:\Development\YADAMU\src\scratch\mysql>node lostConnectionMySQL2.js
Pool Created
Connection obtained
Connection obtained
[ { 'count(*)': '3654736' } ]
Readable => MyTransform => WriteStream
Start Pipeline
Kill 39

The above was captured 5 min after the KILL was executed.

The environment used is shown here

Similar behavior is seen on linux.

@markddrake
Copy link
Author

Zip file with testdata can be made available if required (but any large table that takes more than 5 seconds to process should work)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants