博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊pg jdbc的queryTimeout及next方法
阅读量:7260 次
发布时间:2019-06-29

本文共 12997 字,大约阅读时间需要 43 分钟。

本文主要介绍一下pg jdbc statement的queryTimeout及resultSet的next方法

实例程序

@Test    public void testReadTimeout() throws SQLException {        Connection connection = dataSource.getConnection();        //https://jdbc.postgresql.org/documentation/head/query.html        connection.setAutoCommit(false); //NOTE 为了设置fetchSize,必须设置为false        String sql = "select * from demo_table";        PreparedStatement pstmt;        try {            pstmt = (PreparedStatement)connection.prepareStatement(sql);            pstmt.setQueryTimeout(1); //NOTE 设置Statement执行完成的超时时间,前提是socket的timeout比这个大            pstmt.setFetchSize(5000); //NOTE 这样设置为了模拟query timeout的异常            System.out.println("ps.getQueryTimeout():" + pstmt.getQueryTimeout());            System.out.println("ps.getFetchSize():" + pstmt.getFetchSize());            System.out.println("ps.getFetchDirection():" + pstmt.getFetchDirection());            System.out.println("ps.getMaxFieldSize():" + pstmt.getMaxFieldSize());            ResultSet rs = pstmt.executeQuery(); //NOTE 设置Statement执行完成的超时时间,前提是socket的timeout比这个大            //NOTE 这里返回了就代表statement执行完成,默认返回fetchSize的数据            int col = rs.getMetaData().getColumnCount();            System.out.println("============================");            while (rs.next()) { //NOTE 这个的timeout由socket的超时时间设置,oracle.jdbc.ReadTimeout=60000                for (int i = 1; i <= col; i++) {                    System.out.print(rs.getObject(i));                }                System.out.println("");            }            System.out.println("============================");        } catch (SQLException e) {            e.printStackTrace();        } finally {            //close resources        }    }

PgStatement

ostgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgStatement.java

executeInternal()

private void executeInternal(CachedQuery cachedQuery, ParameterList queryParameters, int flags)      throws SQLException {    closeForNextExecution();    // Enable cursor-based resultset if possible.    if (fetchSize > 0 && !wantsScrollableResultSet() && !connection.getAutoCommit()        && !wantsHoldableResultSet()) {      flags |= QueryExecutor.QUERY_FORWARD_CURSOR;    }    if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) {      flags |= QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS;      // If the no results flag is set (from executeUpdate)      // clear it so we get the generated keys results.      //      if ((flags & QueryExecutor.QUERY_NO_RESULTS) != 0) {        flags &= ~(QueryExecutor.QUERY_NO_RESULTS);      }    }    if (isOneShotQuery(cachedQuery)) {      flags |= QueryExecutor.QUERY_ONESHOT;    }    // Only use named statements after we hit the threshold. Note that only    // named statements can be transferred in binary format.    if (connection.getAutoCommit()) {      flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;    }    // updateable result sets do not yet support binary updates    if (concurrency != ResultSet.CONCUR_READ_ONLY) {      flags |= QueryExecutor.QUERY_NO_BINARY_TRANSFER;    }    Query queryToExecute = cachedQuery.query;    if (queryToExecute.isEmpty()) {      flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;    }    if (!queryToExecute.isStatementDescribed() && forceBinaryTransfers        && (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) {      // Simple 'Q' execution does not need to know parameter types      // When binaryTransfer is forced, then we need to know resulting parameter and column types,      // thus sending a describe request.      int flags2 = flags | QueryExecutor.QUERY_DESCRIBE_ONLY;      StatementResultHandler handler2 = new StatementResultHandler();      connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler2, 0, 0,          flags2);      ResultWrapper result2 = handler2.getResults();      if (result2 != null) {        result2.getResultSet().close();      }    }    StatementResultHandler handler = new StatementResultHandler();    result = null;    try {      startTimer();      connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows,          fetchSize, flags);    } finally {      killTimerTask();    }    result = firstUnclosedResult = handler.getResults();    if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) {      generatedKeys = result;      result = result.getNext();      if (wantsGeneratedKeysOnce) {        wantsGeneratedKeysOnce = false;      }    }  }
注意,这里在执行前后分别调用了startTimer()和killTimerTask()

startTimer()

private void startTimer() {    /*     * there shouldn't be any previous timer active, but better safe than sorry.     */    cleanupTimer();    STATE_UPDATER.set(this, StatementCancelState.IN_QUERY);    if (timeout == 0) {      return;    }    TimerTask cancelTask = new TimerTask() {      public void run() {        try {          if (!CANCEL_TIMER_UPDATER.compareAndSet(PgStatement.this, this, null)) {            // Nothing to do here, statement has already finished and cleared            // cancelTimerTask reference            return;          }          PgStatement.this.cancel();        } catch (SQLException e) {        }      }    };    CANCEL_TIMER_UPDATER.set(this, cancelTask);    connection.addTimerTask(cancelTask, timeout);  }
  • startTimer调用了cleanupTimer()
  • cancelTask调用的是PgStatement.this.cancel()
  • 最后调用connection.addTimerTask添加定时任务

cleanupTimer()

/**   * Clears {@link #cancelTimerTask} if any. Returns true if and only if "cancel" timer task would   * never invoke {@link #cancel()}.   */  private boolean cleanupTimer() {    TimerTask timerTask = CANCEL_TIMER_UPDATER.get(this);    if (timerTask == null) {      // If timeout is zero, then timer task did not exist, so we safely report "all clear"      return timeout == 0;    }    if (!CANCEL_TIMER_UPDATER.compareAndSet(this, timerTask, null)) {      // Failed to update reference -> timer has just fired, so we must wait for the query state to      // become "cancelling".      return false;    }    timerTask.cancel();    connection.purgeTimerTasks();    // All clear    return true;  }
注意这里更新statement状态之后,调用task的cancel,以及connection.purgeTimerTasks()

cancel()

public void cancel() throws SQLException {    if (!STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY, StatementCancelState.CANCELING)) {      // Not in query, there's nothing to cancel      return;    }    try {      // Synchronize on connection to avoid spinning in killTimerTask      synchronized (connection) {        connection.cancelQuery();      }    } finally {      STATE_UPDATER.set(this, StatementCancelState.CANCELLED);      synchronized (connection) {        connection.notifyAll(); // wake-up killTimerTask      }    }  }
executeQuery超时了则直接调用connection.cancelQuery()
public void cancelQuery() throws SQLException {    checkClosed();    queryExecutor.sendQueryCancel();  }

postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/QueryExecutorBase.java

public void sendQueryCancel() throws SQLException {    if (cancelPid <= 0) {      return;    }    PGStream cancelStream = null;    // Now we need to construct and send a cancel packet    try {      if (logger.logDebug()) {        logger.debug(" FE=> CancelRequest(pid=" + cancelPid + ",ckey=" + cancelKey + ")");      }      cancelStream =          new PGStream(pgStream.getSocketFactory(), pgStream.getHostSpec(), cancelSignalTimeout);      if (cancelSignalTimeout > 0) {        cancelStream.getSocket().setSoTimeout(cancelSignalTimeout);      }      cancelStream.sendInteger4(16);      cancelStream.sendInteger2(1234);      cancelStream.sendInteger2(5678);      cancelStream.sendInteger4(cancelPid);      cancelStream.sendInteger4(cancelKey);      cancelStream.flush();      cancelStream.receiveEOF();    } catch (IOException e) {      // Safe to ignore.      if (logger.logDebug()) {        logger.debug("Ignoring exception on cancel request:", e);      }    } finally {      if (cancelStream != null) {        try {          cancelStream.close();        } catch (IOException e) {          // Ignored.        }      }    }  }
向数据库server发送cancel指令

killTimerTask()

private void killTimerTask() {    boolean timerTaskIsClear = cleanupTimer();    // The order is important here: in case we need to wait for the cancel task, the state must be    // kept StatementCancelState.IN_QUERY, so cancelTask would be able to cancel the query.    // It is believed that this case is very rare, so "additional cancel and wait below" would not    // harm it.    if (timerTaskIsClear && STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY, StatementCancelState.IDLE)) {      return;    }    // Being here means someone managed to call .cancel() and our connection did not receive    // "timeout error"    // We wait till state becomes "cancelled"    boolean interrupted = false;    while (!STATE_UPDATER.compareAndSet(this, StatementCancelState.CANCELLED, StatementCancelState.IDLE)) {      synchronized (connection) {        try {          // Note: wait timeout here is irrelevant since synchronized(connection) would block until          // .cancel finishes          connection.wait(10);        } catch (InterruptedException e) { // NOSONAR          // Either re-interrupt this method or rethrow the "InterruptedException"          interrupted = true;        }      }    }    if (interrupted) {      Thread.currentThread().interrupt();    }  }
这里先调用cleanupTimer,然后更新statement的状态

PgConnection

postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgConnection.java

getTimer()

private synchronized Timer getTimer() {    if (cancelTimer == null) {      cancelTimer = Driver.getSharedTimer().getTimer();    }    return cancelTimer;  }
这里创建或获取一个timer

addTimerTask()

public void addTimerTask(TimerTask timerTask, long milliSeconds) {    Timer timer = getTimer();    timer.schedule(timerTask, milliSeconds);  }
这个添加timerTask就是直接调度了

purgeTimerTasks()

postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgConnection.java

public void purgeTimerTasks() {    Timer timer = cancelTimer;    if (timer != null) {      timer.purge();    }  }
在cleanupTimer中被调用,用来清理已经被cancel掉的timer task

PgResultSet

postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgResultSet.java

next()

public boolean next() throws SQLException {    checkClosed();    if (onInsertRow) {      throw new PSQLException(GT.tr("Can''t use relative move methods while on the insert row."),          PSQLState.INVALID_CURSOR_STATE);    }    if (current_row + 1 >= rows.size()) {      if (cursor == null || (maxRows > 0 && row_offset + rows.size() >= maxRows)) {        current_row = rows.size();        this_row = null;        rowBuffer = null;        return false; // End of the resultset.      }      // Ask for some more data.      row_offset += rows.size(); // We are discarding some data.      int fetchRows = fetchSize;      if (maxRows != 0) {        if (fetchRows == 0 || row_offset + fetchRows > maxRows) {          // Fetch would exceed maxRows, limit it.          fetchRows = maxRows - row_offset;        }      }      // Execute the fetch and update this resultset.      connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows);      current_row = 0;      // Test the new rows array.      if (rows.isEmpty()) {        this_row = null;        rowBuffer = null;        return false;      }    } else {      current_row++;    }    initRowBuffer();    return true;  }
这里的fetch没有像executeQuery那样加timer

postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/v3/QueryExecutorImpl.java

public synchronized void fetch(ResultCursor cursor, ResultHandler handler, int fetchSize)      throws SQLException {    waitOnLock();    final Portal portal = (Portal) cursor;    // Insert a ResultHandler that turns bare command statuses into empty datasets    // (if the fetch returns no rows, we see just a CommandStatus..)    final ResultHandler delegateHandler = handler;    handler = new ResultHandlerDelegate(delegateHandler) {      public void handleCommandStatus(String status, int updateCount, long insertOID) {        handleResultRows(portal.getQuery(), null, new ArrayList
(), null); } }; // Now actually run it. try { processDeadParsedQueries(); processDeadPortals(); sendExecute(portal.getQuery(), portal, fetchSize); sendSync(); processResults(handler, 0); estimatedReceiveBufferBytes = 0; } catch (IOException e) { abort(); handler.handleError( new PSQLException(GT.tr("An I/O error occurred while sending to the backend."), PSQLState.CONNECTION_FAILURE, e)); } handler.handleCompletion(); }

小结

  • queryTimeout是采用添加timer来控制,如果请求过多,可能会造成timer过多
timeout时间不宜过长,不过正常执行完sql,会调用killTimerTask()方,里头会先cleanupTimer,取消timerTask,然后调用purgeTimerTasks()清理cancel掉的task,避免timeout时间过长导致task堆积最后内存溢出
  • 超时之后会timer task会向数据库server发送cancel query指令
  • 发送完cancel query指令之后,client端的查询按预期应该抛出SQLException(这里头的机制有待深入研究,可能是server端返回timeout error)
  • executeQuery方法默认会拉取fetchSize的数据并返回
  • next()方法根据需要再去fetch,这个fetch方法就没有timer来限制时间了,但是最底层应该是受socketTimeout限制

doc

转载地址:http://makdm.baihongyu.com/

你可能感兴趣的文章
从配置服务器说起......
查看>>
所谓,引用计数
查看>>
webpack文章(持续更新)
查看>>
个人Web前端开发切图PS设置
查看>>
阿里云SLB(负载均衡)获取真实ip地址, log_format配置
查看>>
IE这回在css flex中扳回一局?
查看>>
Asp.Net Web Api 2 实现多文件打包并下载文件示例源码
查看>>
前端模版引擎选择指南
查看>>
事件绑定机制简单实现
查看>>
七牛云音视频新功能:音频支持AAC_HE
查看>>
让你完全理解base64是怎么回事
查看>>
Meteor的临时的存储:Session
查看>>
iptables规则备份和恢复、firewall的zone的操作、service的操作
查看>>
zabbix API 删除host
查看>>
redis 高级特性一
查看>>
李彦宏亲测“自动驾驶汽车”,Apollo(阿波罗)坐镇
查看>>
NAT原理与配置
查看>>
Linux字符模式下的“远程桌面共享”及屏幕录制
查看>>
详解linux系列之sendmail邮箱服务的安装及配置
查看>>
nagios一键安装脚本
查看>>