Skip to content

Commit

Permalink
added wait queues per pool - fix issue 20
Browse files Browse the repository at this point in the history
  • Loading branch information
Henning committed Feb 1, 2012
1 parent 99f1616 commit 88a9bb6
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 98 deletions.
2 changes: 1 addition & 1 deletion include/emysql.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
%% OTHER DEALINGS IN THE SOFTWARE.


-record(pool, {pool_id, size, user, password, host, port, database, encoding, available=queue:new(), locked=gb_trees:empty()}).
-record(pool, {pool_id, size, user, password, host, port, database, encoding, available=queue:new(), locked=gb_trees:empty(), waiting=queue:new()}).
-record(emysql_connection, {id, pool_id, socket, version, thread_id, caps, language, prepared=gb_trees:empty(), locked_at, alive=true}).
-record(greeting, {protocol_version, server_version, thread_id, salt1, salt2, caps, caps_high, language, status, seq_num, plugin}).
-record(field, {seq_num, catalog, db, table, org_table, name, org_name, type, default, charset_nr, length, flags, decimals}).
Expand Down
14 changes: 8 additions & 6 deletions src/emysql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ decrement_pool_size(PoolId, Num) when is_integer(Num) ->
%%
%% Result = emysql:execute(hello_pool, hello_stmt, ["Hello%"]),
%%
%% %-% io:format("~n~s~n", [string:chars($-,72)]),
%% %-% io:format("~p~n", [Result]),
%% io:format("~n~s~n", [string:chars($-,72)]), %V%
%% io:format("~p~n", [Result]), %V%
%%
%% ok.
%% '''
Expand Down Expand Up @@ -465,7 +465,9 @@ execute(PoolId, StmtName, Timeout) when is_atom(StmtName), is_integer(Timeout) -
%%

execute(PoolId, Query, Args, Timeout) when (is_list(Query) orelse is_binary(Query)) andalso is_list(Args) andalso is_integer(Timeout) ->
io:format("~p execute getting connection for pool id ~p~n",[self(), PoolId]),
Connection = emysql_conn_mgr:wait_for_connection(PoolId),
io:format("~p execute got connection for pool id ~p: ~p~n",[self(), PoolId, Connection#emysql_connection.id]),
monitor_work(Connection, Timeout, {emysql_conn, execute, [Connection, Query, Args]});

execute(PoolId, StmtName, Args, Timeout) when is_atom(StmtName), is_list(Args) andalso is_integer(Timeout) ->
Expand Down Expand Up @@ -569,7 +571,7 @@ monitor_work(Connection, Timeout, {M,F,A}) when is_record(Connection, emysql_con
Pid ! start,
receive
{'DOWN', Mref, process, Pid, {_, closed}} ->
%-% io:format("monitor_work: ~p DOWN/closed -> renew~n", [Pid]),
io:format("monitor_work: ~p DOWN/closed -> renew~n", [Pid]), %V%
case emysql_conn:reset_connection(emysql_conn_mgr:pools(), Connection, keep) of
NewConnection when is_record(NewConnection, emysql_connection) ->
% re-loop, with new connection.
Expand All @@ -583,7 +585,7 @@ monitor_work(Connection, Timeout, {M,F,A}) when is_record(Connection, emysql_con
%% if the process dies, reset the connection
%% and re-throw the error on the current pid.
%% catch if re-open fails and also signal it.
%-% io:format("monitor_work: ~p DOWN ~p -> exit~n", [Pid, Reason]),
io:format("monitor_work: ~p DOWN ~p -> exit~n", [Pid, Reason]), %V%
case emysql_conn:reset_connection(emysql_conn_mgr:pools(), Connection, pass) of
{error,FailedReset} ->
exit({Reason, {and_conn_reset_failed, FailedReset}});
Expand All @@ -593,14 +595,14 @@ monitor_work(Connection, Timeout, {M,F,A}) when is_record(Connection, emysql_con
%% if the process returns data, unlock the
%% connection and collect the normal 'DOWN'
%% message send from the child process
%-% io:format("monitor_work: ~p got result -> demonitor, unlock connection, return result~n", [Pid]),
io:format("monitor_work: ~p got result -> demonitor ~p, unlock connection ~p, return result~n", [Pid, Mref, Connection#emysql_connection.id]), %V%
erlang:demonitor(Mref, [flush]),
emysql_conn_mgr:pass_connection(Connection),
Result
after Timeout ->
%% if we timeout waiting for the process to return,
%% then reset the connection and throw a timeout error
%-% io:format("monitor_work: ~p TIMEOUT -> demonitor, reset connection, exit~n", [Pid]),
io:format("monitor_work: ~p TIMEOUT -> demonitor, reset connection, exit~n", [Pid]), %V%
erlang:demonitor(Mref),
case emysql_conn:reset_connection(emysql_conn_mgr:pools(), Connection, pass) of
{error, FailedReset} ->
Expand Down
51 changes: 27 additions & 24 deletions src/emysql_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ set_encoding(Connection, Encoding) ->
emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0).

execute(Connection, Query, []) when is_list(Query); is_binary(Query) ->
%-% io:format("~n~p~n", [iolist_to_binary(Query)]),
io:format("~p execute: ~p using connection: ~p~n", [self(), iolist_to_binary(Query), Connection#emysql_connection.id]), %V%
Packet = <<?COM_QUERY, (emysql_util:any_to_binary(Query))/binary>>,
% Packet = <<?COM_QUERY, (iolist_to_binary(Query))/binary>>,
emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0);
Expand Down Expand Up @@ -102,38 +102,41 @@ unprepare(Connection, Name) ->
emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0).

open_n_connections(PoolId, N) ->
%-% io:format("open ~p connections for pool ~p~n", [N, PoolId]),
case emysql_conn_mgr:find_pool(PoolId, emysql_conn_mgr:pools(), []) of
io:format("open ~p connections for pool ~p~n", [N, PoolId]), %V%
case emysql_conn_mgr:find_pool(PoolId, emysql_conn_mgr:pools()) of
{Pool, _} ->
[open_connection(Pool) || _ <- lists:seq(1, N)];
_ ->
exit(pool_not_found)
end.

open_connections(Pool) ->
%-% io:format("open connections (loop func)~n"),
io:format("open connections loop: .. "), %V%
case (queue:len(Pool#pool.available) + gb_trees:size(Pool#pool.locked)) < Pool#pool.size of
true ->
io:format(" continues~n"), %V%
Conn = emysql_conn:open_connection(Pool),
io:format("opened connection: ~p~n", [Conn]), %V%
open_connections(Pool#pool{available = queue:in(Conn, Pool#pool.available)});
false ->
io:format(" done~n"), %V%
Pool
end.

open_connection(#pool{pool_id=PoolId, host=Host, port=Port, user=User, password=Password, database=Database, encoding=Encoding}) ->
%-% io:format("~p open connection for pool ~p host ~p port ~p user ~p base ~p~n", [self(), PoolId, Host, Port, User, Database]),
%-% io:format("~p open connection: ... connect ... ~n", [self()]),
io:format("~p open connection for pool ~p host ~p port ~p user ~p base ~p~n", [self(), PoolId, Host, Port, User, Database]), %V%
io:format("~p open connection: ... connect ... ~n", [self()]), %V%
case gen_tcp:connect(Host, Port, [binary, {packet, raw}, {active, false}]) of
{ok, Sock} ->
%-% io:format("~p open connection: ... got socket~n", [self()]),
io:format("~p open connection: ... got socket~n", [self()]), %V%
Mgr = whereis(emysql_conn_mgr),
Mgr /= undefined orelse
exit({failed_to_find_conn_mgr,
"Failed to find conn mgr when opening connection. Make sure crypto is started and emysql.app is in the Erlang path."}),
gen_tcp:controlling_process(Sock, Mgr),
%-% io:format("~p open connection: ... greeting~n", [self()]),
io:format("~p open connection: ... greeting~n", [self()]), %V%
Greeting = emysql_auth:do_handshake(Sock, User, Password),
%-% io:format("~p open connection: ... make new connection~n", [self()]),
io:format("~p open connection: ... make new connection~n", [self()]), %V%
Connection = #emysql_connection{
id = erlang:port_to_list(Sock),
pool_id = PoolId,
Expand All @@ -143,30 +146,30 @@ open_connection(#pool{pool_id=PoolId, host=Host, port=Port, user=User, password=
caps = Greeting#greeting.caps,
language = Greeting#greeting.language
},
%-% io:format("~p open connection: ... set db ...~n", [self()]),
io:format("~p open connection: ... set db ...~n", [self()]), %V%
case emysql_conn:set_database(Connection, Database) of
OK1 when is_record(OK1, ok_packet) ->
%-% io:format("~p open connection: ... db set ok~n", [self()]),
io:format("~p open connection: ... db set ok~n", [self()]), %V%
ok;
Err1 when is_record(Err1, error_packet) ->
%-% io:format("~p open connection: ... db set error~n", [self()]),
io:format("~p open connection: ... db set error~n", [self()]), %V%
exit({failed_to_set_database, Err1#error_packet.msg})
end,
%-% io:format("~p open connection: ... set encoding ...~n", [self()]),
io:format("~p open connection: ... set encoding ...~n", [self()]), %V%
case emysql_conn:set_encoding(Connection, Encoding) of
OK2 when is_record(OK2, ok_packet) ->
ok;
Err2 when is_record(Err2, error_packet) ->
exit({failed_to_set_encoding, Err2#error_packet.msg})
end,
%-% io:format("~p open connection: ... ok, return connection~n", [self()]),
io:format("~p open connection: ... ok, return connection~n", [self()]), %V%
Connection;
{error, Reason} ->
%-% io:format("~p open connection: ... ERROR ~p~n", [self(), Reason]),
%-% io:format("~p open connection: ... exit with failed_to_connect_to_database~n", [self()]),
io:format("~p open connection: ... ERROR ~p~n", [self(), Reason]), %V%
io:format("~p open connection: ... exit with failed_to_connect_to_database~n", [self()]), %V%
exit({failed_to_connect_to_database, Reason});
What ->
%-% io:format("~p open connection: ... UNKNOWN ERROR ~p~n", [self(), What]),
io:format("~p open connection: ... UNKNOWN ERROR ~p~n", [self(), What]), %V%
exit({unknown_fail, What})
end.

Expand All @@ -178,26 +181,26 @@ reset_connection(Pools, Conn, StayLocked) ->
%% we queue the old as available for the next try
%% by the next caller process coming along. So the
%% pool can't run dry, even though it can freeze.
%-% io:format("resetting connection~n"),
%-% io:format("spawn process to close connection~n"),
io:format("resetting connection~n"), %V%
io:format("spawn process to close connection~n"), %V%
spawn(fun() -> close_connection(Conn) end),
%% OPEN NEW SOCKET
case emysql_conn_mgr:find_pool(Conn#emysql_connection.pool_id, Pools, []) of
case emysql_conn_mgr:find_pool(Conn#emysql_connection.pool_id, Pools) of
{Pool, _} ->
%-% io:format("... open new connection to renew~n"),
io:format("... open new connection to renew~n"), %V%
case catch open_connection(Pool) of
NewConn when is_record(NewConn, emysql_connection) ->
%-% io:format("... got it, replace old (~p)~n", [StayLocked]),
io:format("... got it, replace old (~p)~n", [StayLocked]), %V%
case StayLocked of
pass -> emysql_conn_mgr:replace_connection_as_available(Conn, NewConn);
keep -> emysql_conn_mgr:replace_connection_as_locked(Conn, NewConn)
end,
%-% io:format("... done, return new connection~n"),
io:format("... done, return new connection~n"), %V%
NewConn;
Error ->
DeadConn = Conn#emysql_connection{alive=false},
emysql_conn_mgr:replace_connection_as_available(Conn, DeadConn),
%-% io:format("... failed to re-open. Shelving dead connection as available.~n"),
io:format("... failed to re-open. Shelving dead connection as available.~n"), %V%
{error, {cannot_reopen_in_reset, Error}}
end;
undefined ->
Expand Down
Loading

0 comments on commit 88a9bb6

Please sign in to comment.