From 88a9bb65f41edb2bdfafea4b0a8e4f760e43e32c Mon Sep 17 00:00:00 2001 From: Henning Date: Wed, 1 Feb 2012 19:07:38 +0100 Subject: [PATCH] added wait queues per pool - fix issue 20 --- include/emysql.hrl | 2 +- src/emysql.erl | 14 ++-- src/emysql_conn.erl | 51 +++++++------- src/emysql_conn_mgr.erl | 149 ++++++++++++++++++++++------------------ 4 files changed, 118 insertions(+), 98 deletions(-) diff --git a/include/emysql.hrl b/include/emysql.hrl index df1adc77..fc1d77d3 100644 --- a/include/emysql.hrl +++ b/include/emysql.hrl @@ -26,7 +26,7 @@ %% OTHER DEALINGS IN THE SOFTWARE. --record(pool, {pool_id, size, user, password, host, port, database, encoding, available=queue:new(), locked=gb_trees:empty()}). +-record(pool, {pool_id, size, user, password, host, port, database, encoding, available=queue:new(), locked=gb_trees:empty(), waiting=queue:new()}). -record(emysql_connection, {id, pool_id, socket, version, thread_id, caps, language, prepared=gb_trees:empty(), locked_at, alive=true}). -record(greeting, {protocol_version, server_version, thread_id, salt1, salt2, caps, caps_high, language, status, seq_num, plugin}). -record(field, {seq_num, catalog, db, table, org_table, name, org_name, type, default, charset_nr, length, flags, decimals}). diff --git a/src/emysql.erl b/src/emysql.erl index 804439ae..ec443ebc 100644 --- a/src/emysql.erl +++ b/src/emysql.erl @@ -322,8 +322,8 @@ decrement_pool_size(PoolId, Num) when is_integer(Num) -> %% %% Result = emysql:execute(hello_pool, hello_stmt, ["Hello%"]), %% -%% %-% io:format("~n~s~n", [string:chars($-,72)]), -%% %-% io:format("~p~n", [Result]), +%% io:format("~n~s~n", [string:chars($-,72)]), %V% +%% io:format("~p~n", [Result]), %V% %% %% ok. %% ''' @@ -465,7 +465,9 @@ execute(PoolId, StmtName, Timeout) when is_atom(StmtName), is_integer(Timeout) - %% execute(PoolId, Query, Args, Timeout) when (is_list(Query) orelse is_binary(Query)) andalso is_list(Args) andalso is_integer(Timeout) -> + io:format("~p execute getting connection for pool id ~p~n",[self(), PoolId]), Connection = emysql_conn_mgr:wait_for_connection(PoolId), + io:format("~p execute got connection for pool id ~p: ~p~n",[self(), PoolId, Connection#emysql_connection.id]), monitor_work(Connection, Timeout, {emysql_conn, execute, [Connection, Query, Args]}); execute(PoolId, StmtName, Args, Timeout) when is_atom(StmtName), is_list(Args) andalso is_integer(Timeout) -> @@ -569,7 +571,7 @@ monitor_work(Connection, Timeout, {M,F,A}) when is_record(Connection, emysql_con Pid ! start, receive {'DOWN', Mref, process, Pid, {_, closed}} -> - %-% io:format("monitor_work: ~p DOWN/closed -> renew~n", [Pid]), + io:format("monitor_work: ~p DOWN/closed -> renew~n", [Pid]), %V% case emysql_conn:reset_connection(emysql_conn_mgr:pools(), Connection, keep) of NewConnection when is_record(NewConnection, emysql_connection) -> % re-loop, with new connection. @@ -583,7 +585,7 @@ monitor_work(Connection, Timeout, {M,F,A}) when is_record(Connection, emysql_con %% if the process dies, reset the connection %% and re-throw the error on the current pid. %% catch if re-open fails and also signal it. - %-% io:format("monitor_work: ~p DOWN ~p -> exit~n", [Pid, Reason]), + io:format("monitor_work: ~p DOWN ~p -> exit~n", [Pid, Reason]), %V% case emysql_conn:reset_connection(emysql_conn_mgr:pools(), Connection, pass) of {error,FailedReset} -> exit({Reason, {and_conn_reset_failed, FailedReset}}); @@ -593,14 +595,14 @@ monitor_work(Connection, Timeout, {M,F,A}) when is_record(Connection, emysql_con %% if the process returns data, unlock the %% connection and collect the normal 'DOWN' %% message send from the child process - %-% io:format("monitor_work: ~p got result -> demonitor, unlock connection, return result~n", [Pid]), + io:format("monitor_work: ~p got result -> demonitor ~p, unlock connection ~p, return result~n", [Pid, Mref, Connection#emysql_connection.id]), %V% erlang:demonitor(Mref, [flush]), emysql_conn_mgr:pass_connection(Connection), Result after Timeout -> %% if we timeout waiting for the process to return, %% then reset the connection and throw a timeout error - %-% io:format("monitor_work: ~p TIMEOUT -> demonitor, reset connection, exit~n", [Pid]), + io:format("monitor_work: ~p TIMEOUT -> demonitor, reset connection, exit~n", [Pid]), %V% erlang:demonitor(Mref), case emysql_conn:reset_connection(emysql_conn_mgr:pools(), Connection, pass) of {error, FailedReset} -> diff --git a/src/emysql_conn.erl b/src/emysql_conn.erl index 65b72f42..0f0ab3d5 100644 --- a/src/emysql_conn.erl +++ b/src/emysql_conn.erl @@ -45,7 +45,7 @@ set_encoding(Connection, Encoding) -> emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0). execute(Connection, Query, []) when is_list(Query); is_binary(Query) -> - %-% io:format("~n~p~n", [iolist_to_binary(Query)]), + io:format("~p execute: ~p using connection: ~p~n", [self(), iolist_to_binary(Query), Connection#emysql_connection.id]), %V% Packet = <>, % Packet = <>, emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0); @@ -102,8 +102,8 @@ unprepare(Connection, Name) -> emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0). open_n_connections(PoolId, N) -> - %-% io:format("open ~p connections for pool ~p~n", [N, PoolId]), - case emysql_conn_mgr:find_pool(PoolId, emysql_conn_mgr:pools(), []) of + io:format("open ~p connections for pool ~p~n", [N, PoolId]), %V% + case emysql_conn_mgr:find_pool(PoolId, emysql_conn_mgr:pools()) of {Pool, _} -> [open_connection(Pool) || _ <- lists:seq(1, N)]; _ -> @@ -111,29 +111,32 @@ open_n_connections(PoolId, N) -> end. open_connections(Pool) -> - %-% io:format("open connections (loop func)~n"), + io:format("open connections loop: .. "), %V% case (queue:len(Pool#pool.available) + gb_trees:size(Pool#pool.locked)) < Pool#pool.size of true -> + io:format(" continues~n"), %V% Conn = emysql_conn:open_connection(Pool), + io:format("opened connection: ~p~n", [Conn]), %V% open_connections(Pool#pool{available = queue:in(Conn, Pool#pool.available)}); false -> + io:format(" done~n"), %V% Pool end. open_connection(#pool{pool_id=PoolId, host=Host, port=Port, user=User, password=Password, database=Database, encoding=Encoding}) -> - %-% io:format("~p open connection for pool ~p host ~p port ~p user ~p base ~p~n", [self(), PoolId, Host, Port, User, Database]), - %-% io:format("~p open connection: ... connect ... ~n", [self()]), + io:format("~p open connection for pool ~p host ~p port ~p user ~p base ~p~n", [self(), PoolId, Host, Port, User, Database]), %V% + io:format("~p open connection: ... connect ... ~n", [self()]), %V% case gen_tcp:connect(Host, Port, [binary, {packet, raw}, {active, false}]) of {ok, Sock} -> - %-% io:format("~p open connection: ... got socket~n", [self()]), + io:format("~p open connection: ... got socket~n", [self()]), %V% Mgr = whereis(emysql_conn_mgr), Mgr /= undefined orelse exit({failed_to_find_conn_mgr, "Failed to find conn mgr when opening connection. Make sure crypto is started and emysql.app is in the Erlang path."}), gen_tcp:controlling_process(Sock, Mgr), - %-% io:format("~p open connection: ... greeting~n", [self()]), + io:format("~p open connection: ... greeting~n", [self()]), %V% Greeting = emysql_auth:do_handshake(Sock, User, Password), - %-% io:format("~p open connection: ... make new connection~n", [self()]), + io:format("~p open connection: ... make new connection~n", [self()]), %V% Connection = #emysql_connection{ id = erlang:port_to_list(Sock), pool_id = PoolId, @@ -143,30 +146,30 @@ open_connection(#pool{pool_id=PoolId, host=Host, port=Port, user=User, password= caps = Greeting#greeting.caps, language = Greeting#greeting.language }, - %-% io:format("~p open connection: ... set db ...~n", [self()]), + io:format("~p open connection: ... set db ...~n", [self()]), %V% case emysql_conn:set_database(Connection, Database) of OK1 when is_record(OK1, ok_packet) -> - %-% io:format("~p open connection: ... db set ok~n", [self()]), + io:format("~p open connection: ... db set ok~n", [self()]), %V% ok; Err1 when is_record(Err1, error_packet) -> - %-% io:format("~p open connection: ... db set error~n", [self()]), + io:format("~p open connection: ... db set error~n", [self()]), %V% exit({failed_to_set_database, Err1#error_packet.msg}) end, - %-% io:format("~p open connection: ... set encoding ...~n", [self()]), + io:format("~p open connection: ... set encoding ...~n", [self()]), %V% case emysql_conn:set_encoding(Connection, Encoding) of OK2 when is_record(OK2, ok_packet) -> ok; Err2 when is_record(Err2, error_packet) -> exit({failed_to_set_encoding, Err2#error_packet.msg}) end, - %-% io:format("~p open connection: ... ok, return connection~n", [self()]), + io:format("~p open connection: ... ok, return connection~n", [self()]), %V% Connection; {error, Reason} -> - %-% io:format("~p open connection: ... ERROR ~p~n", [self(), Reason]), - %-% io:format("~p open connection: ... exit with failed_to_connect_to_database~n", [self()]), + io:format("~p open connection: ... ERROR ~p~n", [self(), Reason]), %V% + io:format("~p open connection: ... exit with failed_to_connect_to_database~n", [self()]), %V% exit({failed_to_connect_to_database, Reason}); What -> - %-% io:format("~p open connection: ... UNKNOWN ERROR ~p~n", [self(), What]), + io:format("~p open connection: ... UNKNOWN ERROR ~p~n", [self(), What]), %V% exit({unknown_fail, What}) end. @@ -178,26 +181,26 @@ reset_connection(Pools, Conn, StayLocked) -> %% we queue the old as available for the next try %% by the next caller process coming along. So the %% pool can't run dry, even though it can freeze. - %-% io:format("resetting connection~n"), - %-% io:format("spawn process to close connection~n"), + io:format("resetting connection~n"), %V% + io:format("spawn process to close connection~n"), %V% spawn(fun() -> close_connection(Conn) end), %% OPEN NEW SOCKET - case emysql_conn_mgr:find_pool(Conn#emysql_connection.pool_id, Pools, []) of + case emysql_conn_mgr:find_pool(Conn#emysql_connection.pool_id, Pools) of {Pool, _} -> - %-% io:format("... open new connection to renew~n"), + io:format("... open new connection to renew~n"), %V% case catch open_connection(Pool) of NewConn when is_record(NewConn, emysql_connection) -> - %-% io:format("... got it, replace old (~p)~n", [StayLocked]), + io:format("... got it, replace old (~p)~n", [StayLocked]), %V% case StayLocked of pass -> emysql_conn_mgr:replace_connection_as_available(Conn, NewConn); keep -> emysql_conn_mgr:replace_connection_as_locked(Conn, NewConn) end, - %-% io:format("... done, return new connection~n"), + io:format("... done, return new connection~n"), %V% NewConn; Error -> DeadConn = Conn#emysql_connection{alive=false}, emysql_conn_mgr:replace_connection_as_available(Conn, DeadConn), - %-% io:format("... failed to re-open. Shelving dead connection as available.~n"), + io:format("... failed to re-open. Shelving dead connection as available.~n"), %V% {error, {cannot_reopen_in_reset, Error}} end; undefined -> diff --git a/src/emysql_conn_mgr.erl b/src/emysql_conn_mgr.erl index d181837e..6e2e610c 100644 --- a/src/emysql_conn_mgr.erl +++ b/src/emysql_conn_mgr.erl @@ -28,16 +28,16 @@ -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2]). -export([terminate/2, code_change/3]). --export([pools/0, waiting/0, add_pool/1, remove_pool/1, +-export([pools/0, add_pool/1, remove_pool/1, add_connections/2, remove_connections/2, lock_connection/1, wait_for_connection/1, pass_connection/1, replace_connection_as_available/2, replace_connection_as_locked/2, - find_pool/3]). + find_pool/2]). -include("emysql.hrl"). --record(state, {pools, waiting=queue:new()}). +-record(state, {pools}). %%==================================================================== %% API @@ -52,9 +52,6 @@ start_link() -> pools() -> gen_server:call(?MODULE, pools, infinity). -waiting() -> - gen_server:call(?MODULE, waiting, infinity). - add_pool(Pool) -> do_gen_call({add_pool, Pool}). @@ -76,18 +73,18 @@ wait_for_connection(PoolId)-> %-% io:format("~p waits for connection to pool ~p~n", [self(), PoolId]), case lock_connection(PoolId) of unavailable -> - %-% io:format("~p is queued~n", [self()]), - gen_server:call(?MODULE, start_wait, infinity), + io:format("~p is queued~n", [self()]), %V% + gen_server:call(?MODULE, {start_wait, PoolId}, infinity), receive {connection, Connection} -> - %-% io:format("~p gets a connection after waiting in queue~n", [self()]), + io:format("~p gets a connection after waiting in queue~n", [self()]), %V% Connection after lock_timeout() -> - %-% io:format("~p gets no connection and times out -> EXIT~n~n", [self()]), + io:format("~p gets no connection and times out -> EXIT~n~n", [self()]), %V% exit(connection_lock_timeout) end; Connection -> - %-% io:format("~p gets connection~n", [self()]), + io:format("~p gets connection~n", [self()]), %V% Connection end. @@ -140,11 +137,8 @@ init([]) -> handle_call(pools, _From, State) -> {reply, State#state.pools, State}; -handle_call(waiting, _From, State) -> - {reply, State#state.waiting, State}; - handle_call({add_pool, Pool}, _From, State) -> - case find_pool(Pool#pool.pool_id, State#state.pools, []) of + case find_pool(Pool#pool.pool_id, State#state.pools) of {_, _} -> {reply, {error, pool_already_exists}, State}; undefined -> @@ -152,7 +146,7 @@ handle_call({add_pool, Pool}, _From, State) -> end; handle_call({remove_pool, PoolId}, _From, State) -> - case find_pool(PoolId, State#state.pools, []) of + case find_pool(PoolId, State#state.pools) of {Pool, OtherPools} -> {reply, Pool, State#state{pools=OtherPools}}; undefined -> @@ -160,7 +154,7 @@ handle_call({remove_pool, PoolId}, _From, State) -> end; handle_call({add_connections, PoolId, Conns}, _From, State) -> - case find_pool(PoolId, State#state.pools, []) of + case find_pool(PoolId, State#state.pools) of {Pool, OtherPools} -> OtherConns = Pool#pool.available, State1 = State#state{ @@ -172,7 +166,7 @@ handle_call({add_connections, PoolId, Conns}, _From, State) -> end; handle_call({remove_connections, PoolId, Num}, _From, State) -> - case find_pool(PoolId, State#state.pools, []) of + case find_pool(PoolId, State#state.pools) of {Pool, OtherPools} -> case Num > queue:len(Pool#pool.available) of true -> @@ -187,12 +181,15 @@ handle_call({remove_connections, PoolId, Num}, _From, State) -> {reply, {error, pool_not_found}, State} end; -handle_call(start_wait, {From, _Mref}, State) -> - %% place to calling pid at the end of the waiting queue - State1 = State#state{ - waiting = queue:in(From, State#state.waiting) - }, - {reply, ok, State1}; +handle_call({start_wait, PoolId}, {From, _Mref}, State) -> + %% place to calling pid at the end of the waiting queue of its pool + case find_pool(PoolId, State#state.pools) of + {Pool, OtherPools} -> + PoolNow = Pool#pool{ waiting = queue:in(From, Pool#pool.waiting) }, + {reply, ok, State#state{pools=[PoolNow|OtherPools]}}; + undefined -> + {reply, {error, pool_not_found}, State} + end; handle_call({lock_connection, PoolId}, _From, State) -> %% find the next available connection in the pool identified by PoolId @@ -210,7 +207,7 @@ handle_call({lock_connection, PoolId}, _From, State) -> end; handle_call({pass_connection, Connection}, _From, State) -> - {Result, State1} = pass_on_or_queue_as_available(State, Connection, State#state.waiting), + {Result, State1} = pass_on_or_queue_as_available(State, Connection), {reply, Result, State1}; handle_call({replace_connection_as_available, OldConn, NewConn}, _From, State) -> @@ -222,7 +219,7 @@ handle_call({replace_connection_as_available, OldConn, NewConn}, _From, State) - %% passed in to serve as the replacement for the old one. %% But i.e. if the sql server is down, it can be fed a dead %% old connection as new connection, to preserve the pool size. - case find_pool(OldConn#emysql_connection.pool_id, State#state.pools, []) of + case find_pool(OldConn#emysql_connection.pool_id, State#state.pools) of {Pool, OtherPools} -> Pool1 = Pool#pool{ available = queue:in(NewConn, Pool#pool.available), @@ -237,7 +234,7 @@ handle_call({replace_connection_as_locked, OldConn, NewConn}, _From, State) -> %% replace an existing, locked condition with the newly supplied one %% and keep it in the locked list so that the caller can continue to use it %% without having to lock another connection. - case find_pool(OldConn#emysql_connection.pool_id, State#state.pools, []) of + case find_pool(OldConn#emysql_connection.pool_id, State#state.pools) of {Pool, OtherPools} -> LockedStripped = gb_trees:delete_any(OldConn#emysql_connection.id, Pool#pool.locked), LockedAdded = gb_trees:enter(NewConn#emysql_connection.id, NewConn, LockedStripped), @@ -304,6 +301,9 @@ initialize_pools() -> } || {PoolId, Props} <- emysql_app:pools() ]. +find_pool(PoolId, Pools) -> + find_pool(PoolId, Pools, []). + find_pool(_, [], _) -> undefined; find_pool(PoolId, [#pool{pool_id = PoolId} = Pool|Tail], OtherPools) -> @@ -313,7 +313,7 @@ find_pool(PoolId, [Pool|Tail], OtherPools) -> find_pool(PoolId, Tail, [Pool|OtherPools]). find_next_connection_in_pool(Pools, PoolId) -> - case find_pool(PoolId, Pools, []) of + case find_pool(PoolId, Pools) of {Pool, OtherPools} -> % check no of connection in Pool %-% io:format("~p Pool ~p Connections available: ~p~n", [self(), PoolId, queue:len(Pool#pool.available)]), @@ -329,45 +329,60 @@ find_next_connection_in_pool(Pools, PoolId) -> end. %% This function does not wait, but may loop over the queue. -pass_on_or_queue_as_available(State, Connection, Waiting) -> - %% check if any processes are waiting for a connection - case queue:is_empty(Waiting) of - true -> - %% if no processes are waiting then unlock the connection - case find_pool(Connection#emysql_connection.pool_id, State#state.pools, []) of - {Pool, OtherPools} -> - %% find connection in locked tree - case gb_trees:lookup(Connection#emysql_connection.id, Pool#pool.locked) of - {value, Conn} -> - %%% - %% add it to the available queue and remove from locked tree - Pool1 = Pool#pool{ - available = queue:in(Conn#emysql_connection{locked_at=undefined}, Pool#pool.available), - locked = gb_trees:delete_any(Connection#emysql_connection.id, Pool#pool.locked) - }, - {ok, State#state{pools = [Pool1|OtherPools]}}; - %%% - none -> - {{error, connection_not_found}, State} - end; - undefined -> - {{error, pool_not_found}, State} - end; - false -> - %% if the waiting queue is not empty then remove the head of - %% the queue and check if that process is still waiting - %% for a connection. If so, send the connection. Regardless, - %% update the queue in state once the head has been removed. - {{value, Pid}, Waiting1} = queue:out(Waiting), - case erlang:process_info(Pid, current_function) of - {current_function,{emysql_conn_mgr,wait_for_connection,1}} -> - erlang:send(Pid, {connection, Connection}), - {ok, State#state{waiting = Waiting1}}; - _ -> - % loop, to traverse queue to find a sane candidate, until empty. - pass_on_or_queue_as_available(State, Connection, Waiting1) - end - end. +pass_on_or_queue_as_available(State, Connection) -> + + % get the pool that this connection belongs to + case find_pool(Connection#emysql_connection.pool_id, State#state.pools) of + + {Pool, OtherPools} -> + + %% check if any processes are waiting for a connection + Waiting = Pool#pool.waiting, + case queue:is_empty(Waiting) of + + %% if no processes are waiting then unlock the connection + true -> + + %% find connection in locked tree + case gb_trees:lookup(Connection#emysql_connection.id, Pool#pool.locked) of + + {value, Conn} -> + + %% add the connection to the 'available' queue and remove from 'locked' tree + Pool1 = Pool#pool{ + available = queue:in(Conn#emysql_connection{locked_at=undefined}, Pool#pool.available), + locked = gb_trees:delete_any(Connection#emysql_connection.id, Pool#pool.locked) + }, + {ok, State#state{pools = [Pool1|OtherPools]}}; + + none -> + {{error, connection_not_found}, State} + end; + + %% if the waiting queue is not empty then remove the head of + %% the queue and check if that process is still waiting + %% for a connection. If so, send the connection. Regardless, + %% update the pool & queue in state once the head has been removed. + false -> + + {{value, Pid}, OtherWaiting} = queue:out(Waiting), + PoolNow = Pool#pool{ waiting = OtherWaiting }, + StateNow = State#state{ pools = [PoolNow|OtherPools] }, + + case erlang:process_info(Pid, current_function) of + {current_function,{emysql_conn_mgr,wait_for_connection,1}} -> + erlang:send(Pid, {connection, Connection}), + {ok, StateNow}; + _ -> + % loop, to traverse queue to find a healthy candidate, until empty. + pass_on_or_queue_as_available(StateNow, Connection) + end + end; + + %% pool not found + undefined -> + {{error, pool_not_found}, State} + end. lock_timeout() -> emysql_app:lock_timeout().