Mix . install (
[
{ :httpoison , "~> 1.8" } ,
{ :jason , "~> 1.4" } ,
{ :vega_lite , "~> 0.1.5" } ,
{ :kino , "~> 0.8.0" } ,
{ :kino_vega_lite , "~> 0.1.1" } ,
{ :explorer , "~> 0.4.0" } ,
{ :axon , "~> 0.3.0" } ,
{ :exla , "~> 0.4.0" } ,
{ :nx , "~> 0.4.0" } ,
{ :table_rex , "~> 3.1" }
] ,
config: [
nx: [ default_backend: EXLA.Backend ]
]
)
alias VegaLite , as: Vl
alias Explorer.DataFrame , as: DF
alias Explorer.Series
# Sets the global compilation options
Nx.Defn . global_default_options ( compiler: EXLA )
# Sets the process-level compilation options
Nx.Defn . default_options ( compiler: EXLA )
# defining api calls to get data
require Explorer.DataFrame
defmodule WaterAPI do
require Logger
@ water_nsw_api_endpoint "https://realtimedata.waternsw.com.au/cgi/webservice.pl"
def get_station_by_coords ( longitude , latitude , radius ) do
% {
"function" => "get_db_info" ,
"version" => 3 ,
"params" => % {
"table_name" => "site" ,
"return_type" => "array" ,
"filter_values" => % { "active" => "true" } ,
"geo_filter" => % {
"circle" => [ longitude , latitude , radius ]
}
}
}
|> Jason . encode! ( )
|> post_to_api ( )
end
def get_variable_list ( station_ids , opts \\ [ ] ) do
data_source = Keyword . get ( opts , :data_source , "A" )
station_ids_as_string = Enum . join ( station_ids , "," )
% {
"function" => "get_variable_list" ,
"version" => 1 ,
"params" => % {
"site_list" => station_ids_as_string ,
"datasource" => data_source
}
}
|> Jason . encode! ( )
|> post_to_api ( )
end
def get_ts_trace ( station_id , start_time , end_time , var_list , interval , aggregate , opts ) do
datasource = Keyword . get ( opts , :data_source , "A" )
multiplier = Keyword . get ( opts , :multiplier , "1" )
var_list_string = Enum . join ( var_list , "," )
% {
function: "get_ts_traces" ,
version: 2 ,
params: % {
site_list: station_id ,
datasource: datasource ,
start_time: start_time ,
end_time: end_time ,
var_list: var_list_string ,
interval: interval ,
multiplier: multiplier ,
data_type: aggregate
}
}
|> Jason . encode! ( )
|> post_to_api ( )
|> case do
{ :ok , data } ->
data [ "traces" ]
|> List . first ( )
|> Map . get ( "trace" )
{ :error , msg } ->
msg
end
end
defp post_to_api ( payload ) do
HTTPoison . post! (
@ water_nsw_api_endpoint ,
payload ,
[ ] ,
timeout: 50_000 ,
recv_timeout: 50_000
)
|> then ( fn r -> r . body end )
|> Jason . decode! ( )
|> case do
% { "_return" => data } -> { :ok , data }
% { "error_msg" => error_msg } -> { :error , error_msg }
end
end
end
defmodule Helper do
def rename_cols_by_id ( dfs , station_ids , variable_no ) do
for { df , station_id } <- Enum . zip ( dfs , station_ids ) do
DF . rename (
df ,
q: "q_#{ station_id } _#{ variable_no } " ,
v: "v_#{ station_id } _#{ variable_no } "
)
end
end
def join_dfs ( dfs ) when is_list ( dfs ) do
Enum . reduce ( dfs , fn df , acc ->
DF . join ( df , acc , how: :inner )
end )
end
def split_data ( df , decimal ) do
row_no = DF . n_rows ( df )
{ first , second } =
df
|> DF . to_rows ( )
|> Enum . shuffle ( )
|> Enum . split ( round ( decimal * row_no ) )
{ DF . new ( first ) , DF . new ( second ) }
end
def df_to_batches ( df , feature_cols , label_col , batch_size \\ 1 ) do
Stream . zip (
df_to_tensor_batches ( df [ feature_cols ] , batch_size ) ,
df_to_tensor_batches ( df [ [ label_col ] ] , batch_size )
)
end
def df_to_tensor_batches ( df , batch_size ) do
df
|> df_to_tensor ( )
|> Nx . shuffle ( axis: 0 )
|> Nx . to_batched ( batch_size , leftover: :discard )
end
def df_to_tensor ( df ) do
df
|> DF . names ( )
|> Enum . map ( & Explorer.Series . to_tensor ( df [ & 1 ] ) )
|> Nx . stack ( axis: 1 )
end
end
# weather and stream station ids, closest to the dam, see here for more info:
# https://realtimedata.waternsw.com.au/
weather_station_ids = [ "563035" , "563046" , "563079" , "568045" , "568051" ]
stream_station_ids = [ "212250" , "212270" ]
water_level_df =
WaterAPI . get_ts_trace (
"212242" ,
"20080130000000" ,
"20220112000000" ,
[ "130.00" ] ,
"day" ,
"mean" ,
datasource: "CP"
)
|> DF . new ( )
|> DF . mutate ( v: Series . cast ( v , :float ) )
rainfall_dfs =
for station_id <- weather_station_ids do
WaterAPI . get_ts_trace (
station_id ,
"20080130000000" ,
"20220112000000" ,
[ "10.00" ] ,
"day" ,
"tot" ,
datasource: "CP"
)
|> DF . new ( )
|> DF . mutate ( v: Series . cast ( v , :float ) )
end
stream_dfs =
for station_id <- stream_station_ids do
WaterAPI . get_ts_trace (
station_id ,
"20080130000000" ,
"20220112000000" ,
[ "100.00" ] ,
"day" ,
"mean" ,
datasource: "CP"
)
|> DF . new ( )
|> DF . mutate ( v: Series . cast ( v , :float ) )
end
Deriving 'Water Level Difference'
# making new column which is water level shifted by one day
water_level_tomorrow =
water_level_df [ "v" ]
|> Series . shift ( - 1 )
|> then ( fn s -> DF . new ( water_level_tomorrow: s ) end )
water_level_df =
water_level_df
|> DF . concat_columns ( water_level_tomorrow )
|> DF . mutate ( water_level_difference: water_level_tomorrow - v )
|> DF . discard ( "water_level_tomorrow" )
water_level_df =
DF . filter (
water_level_df ,
q != 201 and q != 255
)
rainfall_dfs =
for df <- rainfall_dfs do
df
|> DF . mutate ( m: cast ( q != 201 and q != 255 , :integer ) )
|> DF . mutate ( v: v * m )
|> DF . discard ( :m )
end
stream_dfs =
for df <- stream_dfs do
df
|> DF . mutate ( m: cast ( q != 201 and q != 255 , :integer ) )
|> DF . mutate ( v: v * m )
|> DF . discard ( :m )
end
water_level_df = DF . rename ( water_level_df , q: "q_212242_130" , v: "v_212242_130" )
rainfall_df =
rainfall_dfs
|> Helper . rename_cols_by_id ( weather_station_ids , "10" )
|> Helper . join_dfs ( )
stream_df =
stream_dfs
|> Helper . rename_cols_by_id ( stream_station_ids , "100" )
|> Helper . join_dfs ( )
df =
water_level_df
|> DF . join ( rainfall_df )
|> DF . join ( stream_df )
DF . names ( df )
Prepare Data for Neural Net
feature_columns = [
"v_568051_10" ,
"v_568045_10" ,
"v_563079_10" ,
"v_563046_10" ,
"v_563035_10" ,
"v_212250_100" ,
"v_212270_100"
]
label_column = "water_level_difference"
model_columns = [ label_column | feature_columns ]
df = DF . select ( df , model_columns )
{ train_df , test_df } = Helper . split_data ( df , 0.8 )
{ train_df , validation_df } = Helper . split_data ( train_df , 0.8 )
training_batches = Helper . df_to_batches ( train_df , feature_columns , label_column , 8 )
validation_batches = Helper . df_to_batches ( validation_df , feature_columns , label_column , 1 )
testing_batches = Helper . df_to_batches ( test_df , feature_columns , label_column , 1 )
row_no = DF . n_rows ( df )
graph_data =
DF . concat_columns (
df [ [ label_column ] ] ,
DF . new ( % { "count" => Enum . map ( 1 .. row_no , & & 1 ) } )
)
|> DF . to_rows ( )
Vl . new ( width: 800 , height: 600 )
|> Vl . data_from_values ( graph_data |> Enum . take ( 300 ) )
|> Vl . encode ( :y , field: "water_level_difference" , type: :quantitative )
|> Vl . encode ( :x , field: "count" , type: :quantitative )
|> Vl . mark ( :line )
|> Vl . param ( "grid" , select: :interval , bind: :scales )
model =
Axon . input ( "stream_and_rain_model" )
|> Axon . dropout ( rate: 0.5 )
|> Axon . dense ( 16 )
|> Axon . dropout ( rate: 0.5 )
|> Axon . relu ( )
|> Axon . dense ( 1 )
model_params =
model
|> Axon.Loop . trainer ( :mean_absolute_error , Axon.Optimizers . adam ( 0.001 ) )
|> Axon.Loop . validate ( model , validation_batches )
|> Axon.Loop . metric ( :mean_absolute_error , "validation_loss" )
|> Axon.Loop . run ( training_batches , % { } , epochs: 50 )
model
|> Axon.Loop . evaluator ( )
|> Axon.Loop . metric ( :mean_absolute_error )
|> Axon.Loop . run ( testing_batches , model_params , epoch: 1 )
Prediction vs Actual Change in Water Level
all_features = Helper . df_to_tensor ( df [ feature_columns ] )
all_labels =
df [ label_column ]
|> Series . to_tensor ( )
|> Nx . to_flat_list ( )
{ _init_fn , predict_fn } = Axon . build ( model , mode: :inference )
predictions =
predict_fn . ( model_params , all_features )
|> Nx . to_flat_list ( )
row_no = 4640
chart_data =
DF . new (
prediction: predictions ,
actual: all_labels ,
count: Enum . map ( 1 .. row_no , & & 1 )
)
|> DF . to_rows ( )
Vl . new ( width: 400 , height: 400 )
|> Vl . data_from_values ( Enum . take ( chart_data , 300 ) )
|> Vl . layers ( [
Vl . new ( )
|> Vl . param ( "prediction_chart" , select: :interval , bind: :scales , encodings: [ "x" , "y" ] )
|> Vl . encode ( :y , field: "prediction" , type: :quantitative )
|> Vl . encode ( :x , field: "count" , type: :quantitative )
|> Vl . mark ( :line , color: "orange" ) ,
Vl . new ( )
|> Vl . encode ( :x , field: "count" , type: :quantitative )
|> Vl . encode ( :y , field: "actual" , type: :quantitative )
|> Vl . mark ( :line , color: "blue" )
] )