Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TURN support #30

Merged
merged 4 commits into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ mix test.watch
mix release production
```

**Options:**

To use a Turn Server, generate a secret key string, e.g. via `openssl rand -base64 30` and set:
```
export SIGNALTOWER_TURN_SECRET=<generated_secret_key>
```
The same secret key must be configured in the turn server.
For example for coturn, use the following configuration in turnserver.conf:
```
use-auth-secret
static-auth-secret=<generated_secret_key>
```

By default, the websocket port 4233 is used, you can change it via:
```
export SIGNALTOWER_PORT=1234
Expand All @@ -47,6 +60,8 @@ By default, the websocket is bound to all interfaces (0.0.0.0), you can also bin
export SIGNALTOWER_LOCALHOST
```

## References

[palava protocol]: https://github.com/palavatv/palava-client/wiki/Protocol
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change in the protocol should be documented.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about making an official release after this and writing it in the CHANGELOG?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue to track: palavatv/palava-client#32

[palava client]: https://github.com/palavatv/palava-client/
[palava project]: https://github.com/palavatv/palava
82 changes: 55 additions & 27 deletions lib/signal_tower/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ defmodule SignalTower.Room do
alias SignalTower.Room.{Member, Membership, Supervisor}
alias SignalTower.Stats

# a turn token is valid for three hours
@turn_validity_period 3 * 60 * 60

## API ##

def start_link(room_id) do
name = "room_#{room_id}" |> String.to_atom()
GenServer.start_link(__MODULE__, room_id, name: name)
GenServer.start_link(__MODULE__, :ok, name: name)
end

def create(room_id) do
Expand All @@ -19,32 +22,36 @@ defmodule SignalTower.Room do
end
end

def join_and_monitor(room_id, status) do
def join_and_monitor(room_id, status, turn_token_expiry) do
room_pid = create(room_id)
Process.monitor(room_pid)
own_id = GenServer.call(room_pid, {:join, self(), status})
%Membership{id: room_id, pid: room_pid, own_id: own_id, own_status: status}

{own_id, new_turn_token_expiry} =
GenServer.call(room_pid, {:join, self(), status, turn_token_expiry})

membership = %Membership{id: room_id, pid: room_pid, own_id: own_id, own_status: status}
%{room: membership, turn_token_expiry: new_turn_token_expiry}
end

## Callbacks ##

@impl GenServer
def init(room_id) do
def init(_) do
GenServer.cast(Stats, {:room_created, self()})
{:ok, {room_id, %{}}}
{:ok, %{}}
end

@impl GenServer
def handle_call({:join, pid, status}, _, {room_id, members}) do
def handle_call({:join, pid, status, turn_token_expiry}, _, members) do
GenServer.cast(Stats, {:peer_joined, self(), map_size(members) + 1})

Process.monitor(pid)
peer_id = UUID.uuid1()
send_joined_room(pid, peer_id, members)
new_turn_token_expiry = send_joined_room(pid, peer_id, members, turn_token_expiry)
send_new_peer(members, peer_id, status)

new_member = %Member{peer_id: peer_id, pid: pid, status: status}
{:reply, peer_id, {room_id, Map.put(members, peer_id, new_member)}}
{:reply, {peer_id, new_turn_token_expiry}, Map.put(members, peer_id, new_member)}
end

@impl GenServer
Expand All @@ -62,16 +69,16 @@ defmodule SignalTower.Room do
end

@impl GenServer
def handle_cast({:send_to_peer, peer_id, msg, sender_id}, state = {_, members}) do
def handle_cast({:send_to_peer, peer_id, msg, sender_id}, members) do
if members[sender_id] && members[peer_id] do
send(members[peer_id].pid, {:to_user, Map.put(msg, :sender_id, sender_id)})
end

{:noreply, state}
{:noreply, members}
end

@impl GenServer
def handle_cast({:update_status, sender_id, status}, state = {_, members}) do
def handle_cast({:update_status, sender_id, status}, members) do
if members[sender_id] do
update_status = %{
event: "peer_updated_status",
Expand All @@ -83,41 +90,41 @@ defmodule SignalTower.Room do
|> send_to_all(update_status)
end

{:noreply, state}
{:noreply, members}
end

# invoked when a user session exits
@impl GenServer
def handle_info({:DOWN, _ref, _, pid, _}, state = {_, members}) do
def handle_info({:DOWN, _ref, _, pid, _}, members) do
members
|> Enum.find(fn {_, member} -> pid == member.pid end)
|> case do
{id, _} ->
case leave(id, state) do
case leave(id, members) do
{:ok, state} -> {:noreply, state}
{:error, state} -> {:noreply, state}
{:stop, state} -> {:stop, :normal, state}
end

_ ->
{:noreply, state}
{:noreply, members}
end
end

defp leave(peer_id, state = {room_id, members}) do
defp leave(peer_id, members) do
if members[peer_id] do
GenServer.cast(Stats, {:peer_left, self()})
next_members = Map.delete(members, peer_id)

if map_size(next_members) > 0 do
send_peer_left(next_members, peer_id)
{:ok, {room_id, next_members}}
{:ok, next_members}
else
GenServer.cast(Stats, {:room_closed, self()})
{:stop, {room_id, next_members}}
{:stop, next_members}
end
else
{:error, state}
{:error, members}
end
end

Expand All @@ -128,14 +135,35 @@ defmodule SignalTower.Room do
end)
end

defp send_joined_room(pid, peer_id, members) do
response_for_joined_peer = %{
event: "joined_room",
own_id: peer_id,
peers: members |> Map.values()
}
defp send_joined_room(pid, own_id, members, turn_token_expiry) do
now = System.os_time(:second)

{turn_response, next_turn_token_expiry} =
if System.get_env("SIGNALTOWER_TURN_SECRET") && turn_token_expiry < now do
next_expiry = now + @turn_validity_period
user = to_string(next_expiry) <> ":" <> own_id
secret = System.get_env("SIGNALTOWER_TURN_SECRET")

response = %{
turn_user: user,
turn_password:
:crypto.mac(:hmac, :sha, to_charlist(secret), to_charlist(user)) |> Base.encode64()
}

{response, next_expiry}
else
{%{}, turn_token_expiry}
end

joined_response =
Map.merge(turn_response, %{
event: "joined_room",
own_id: own_id,
peers: members |> Map.values()
})

send(pid, {:to_user, response_for_joined_peer})
send(pid, {:to_user, joined_response})
next_turn_token_expiry
end

defp send_new_peer(members, peer_id, status) do
Expand Down
42 changes: 23 additions & 19 deletions lib/signal_tower/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,48 +11,48 @@ defmodule SignalTower.Session do
|> (&Process.register(self(), &1)).()
end

def handle_message(msg, room) do
case MsgIntegrity.check(msg, room) do
def handle_message(msg, state) do
case MsgIntegrity.check(msg, state.room) do
{:ok, msg} ->
incoming_message(msg, room)
incoming_message(msg, state)

{:error, error} ->
send_error(error, msg)
room
state
end
end

defp incoming_message(msg = %{"event" => "join_room"}, _) do
Room.join_and_monitor(msg["room_id"], msg["status"])
defp incoming_message(msg = %{"event" => "join_room"}, state) do
Room.join_and_monitor(msg["room_id"], msg["status"], state.turn_token_expiry)
end

defp incoming_message(msg = %{"event" => "leave_room"}, room) do
defp incoming_message(msg = %{"event" => "leave_room"}, state = %{room: room}) do
if room do
case GenServer.call(room.pid, {:leave, room.own_id}) do
:ok ->
nil
%{state | room: nil}

:error ->
send_error("You are not currently in a room, so you can not leave it", msg)
room
state
end
else
send_error("You are not currently in a room, so you can not leave it", msg)
room
state
end
end

defp incoming_message(msg = %{"event" => "send_to_peer"}, room) do
defp incoming_message(msg = %{"event" => "send_to_peer"}, state = %{room: room}) do
GenServer.cast(room.pid, {:send_to_peer, msg["peer_id"], msg["data"], room.own_id})
room
state
end

defp incoming_message(msg = %{"event" => "update_status"}, room) do
defp incoming_message(msg = %{"event" => "update_status"}, state = %{room: room}) do
GenServer.cast(room.pid, {:update_status, room.own_id, msg["status"]})
room
state
end

defp incoming_message(%{"event" => "ping"}, room) do
defp incoming_message(%{"event" => "ping"}, state) do
send(
self(),
{:to_user,
Expand All @@ -61,16 +61,20 @@ defmodule SignalTower.Session do
}}
)

room
state
end

# invoked when a room exits
def handle_exit_message(pid, room, status) do
def handle_exit_message(
pid,
status,
state = %{room: room, turn_token_expiry: turn_token_expiry}
) do
if room && pid == room.pid && status != :normal do
# current room died => automatic rejoin
Room.join_and_monitor(room.id, room.own_status)
Room.join_and_monitor(room.id, room.own_status, turn_token_expiry)
else
nil
state
end
end

Expand Down
27 changes: 17 additions & 10 deletions lib/signal_tower/websocket_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,32 @@ defmodule SignalTower.WebsocketHandler do

@impl :cowboy_websocket
def init(req, _state) do
{:cowboy_websocket, req, nil, %{idle_timeout: :timer.seconds(30)}}
initial_state = %{
# room membership
room: nil,
# initialize turn token expiry with 0, it will be properly initialized on first room join
turn_token_expiry: 0
}

{:cowboy_websocket, req, initial_state, %{idle_timeout: :timer.seconds(30)}}
end

@impl :cowboy_websocket
def websocket_init(state) do
def websocket_init(initial_state) do
Session.init()
:timer.send_interval(:timer.seconds(5), :send_ping)
{:ok, state}
{:ok, initial_state}
end

@impl :cowboy_websocket
def websocket_handle({:text, msg}, room) do
def websocket_handle({:text, msg}, state) do
case Poison.decode(msg) do
{:ok, parsed_msg} ->
{:ok, Session.handle_message(parsed_msg, room)}
{:ok, Session.handle_message(parsed_msg, state)}

_ ->
answer = Poison.encode!(%{event: "error", description: "invalid json", received_msg: msg})
{:reply, {:text, answer}, room}
{:reply, {:text, answer}, state}
end
end

Expand Down Expand Up @@ -53,13 +60,13 @@ defmodule SignalTower.WebsocketHandler do
end

@impl :cowboy_websocket
def websocket_info({:DOWN, _, _, pid, status}, room) do
{:ok, Session.handle_exit_message(pid, room, status)}
def websocket_info({:DOWN, _, _, pid, status}, state) do
{:ok, Session.handle_exit_message(pid, status, state)}
end

@impl :cowboy_websocket
def websocket_info(:send_ping, status) do
{:reply, {:ping, "server ping"}, status}
def websocket_info(:send_ping, state) do
{:reply, {:ping, "server ping"}, state}
end

@impl :cowboy_websocket
Expand Down
Loading