Skip to content

Commit 81a41fb

Browse files
authored
Merge pull request #400 from leondavi/source
improve compression of data in sources add notice of receiveing data …
2 parents 98f1801 + d5f9fd7 commit 81a41fb

File tree

3 files changed

+8
-6
lines changed

3 files changed

+8
-6
lines changed

src_erl/NerlnetApp/src/Source/castingHandler.erl

+2-3
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@ init(Req0, [Action,Source_StateM_Pid]) ->
1818
%% io:format("casting handler got Body:~p~n",[Body]),
1919
case Action of
2020
csv -> {_ , Body} = nerl_tools:read_all_data(Req0 , <<>>),
21-
{WorkersList, Phase, NumOfBatches, NerlTensorType, Data} = binary_to_term(Body),
22-
UncompressedData = binary_to_list(zlib:uncompress(Data)),
23-
gen_statem:cast(Source_StateM_Pid, {batchList, WorkersList, list_to_atom(Phase), list_to_integer(NumOfBatches), NerlTensorType , UncompressedData});
21+
{WorkersList, Phase, NumOfBatches, NerlTensorType, CompressedData} = binary_to_term(Body),
22+
gen_statem:cast(Source_StateM_Pid, {batchList, WorkersList, list_to_atom(Phase), list_to_integer(NumOfBatches), NerlTensorType , CompressedData});
2423
startCasting -> {_,Body,_} = cowboy_req:read_body(Req0),
2524
gen_statem:cast(Source_StateM_Pid, {startCasting,Body});
2625
statistics -> gen_statem:cast(Source_StateM_Pid, {statistics});

src_erl/NerlnetApp/src/Source/parser.erl

+2-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ dataStrToNumericParallelLoop(PF, EtsTable, ListOfLinesOfData, ErlType, LastKey)
105105
dataStrToNumericData(ListOfLinesOfData , ErlType)->
106106
EtsTable = ets:new(data_str_to_numeric_data, [ordered_set, public]),
107107
dataStrToNumericParallelLoop(?PARALLELIZATION_FACTOR, EtsTable, ListOfLinesOfData, ErlType, 0),
108-
[ element(?DATA_IDX, Attribute) || Attribute <- ets:tab2list(EtsTable)].
108+
NumericDataList = [ element(?DATA_IDX, Attribute) || Attribute <- ets:tab2list(EtsTable)],
109+
ets:delete(EtsTable), NumericDataList.
109110

110111
generateListOfBatches(ListOfList, BatchSize) -> generateListOfBatches(ListOfList, BatchSize, []).
111112

src_erl/NerlnetApp/src/Source/sourceStatem.erl

+4-2
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,14 @@ state_name(_EventType, _EventContent, State = #source_statem_state{}) ->
117117

118118

119119
%% This cast receive a list of samples to load to the records batchList
120-
idle(cast, {batchList, WorkersList, Phase, NumOfBatches, NerlTensorType, Data}, State) ->
120+
idle(cast, {batchList, WorkersList, Phase, NumOfBatches, NerlTensorType, CompressedData}, State) ->
121121
EtsRef = get(source_ets),
122122
StatsEtsRef = get(source_stats_ets),
123123
MyName = ets:lookup_element(EtsRef, my_name, ?DATA_IDX),
124124
BatchSize = ets:lookup_element(EtsRef, batch_size, ?DATA_IDX),
125-
{NerlTensorBatchesList, SampleSize} = parser:parseCSV(MyName, BatchSize, NerlTensorType, Data), % TODO this is slow and heavy policy! pre parse in ETS a possible solution
125+
?LOG_NOTICE("Source ~p, Receiving and parsing data", [MyName]),
126+
UncompressedData = binary_to_list(zlib:uncompress(CompressedData)),
127+
{NerlTensorBatchesList, SampleSize} = parser:parseCSV(MyName, BatchSize, NerlTensorType, UncompressedData), % TODO this is slow and heavy policy! pre parse in ETS a possible solution
126128
ets:update_element(EtsRef, workers_list, [{?DATA_IDX, WorkersList}]),
127129
ets:update_element(EtsRef, num_of_batches, [{?DATA_IDX, NumOfBatches}]),
128130
ets:update_element(EtsRef, current_phase, [{?DATA_IDX, Phase}]),

0 commit comments

Comments
 (0)