Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance for Trip Segmentation Stage #1041

Open
humbleOldSage opened this issue Jan 15, 2024 · 18 comments
Open

Improve performance for Trip Segmentation Stage #1041

humbleOldSage opened this issue Jan 15, 2024 · 18 comments

Comments

@humbleOldSage
Copy link

Aim : To improve the performance of the trip segmentation stage of the pipeline by reducing the number of DB calls and performing more in-memory operations (potentially using pandas).

@humbleOldSage
Copy link
Author

humbleOldSage commented Jan 15, 2024

Spent week and a half to figure out the pipeline flow (including the trip segmentation flow) from

a. Dr. Shankari's thesis

b. discussion with @MukuFlash03 where he gave Issue #950 as a good way to understand the flow of prediction pipeline.

pipeline begins from intake_stage.py file run_intake_pipeline_for_user function which shows the various stage calls as well.

We'll currently focus on the eaist.segment_current_trips stage ( at emission/analysis/intake/segmentation/trip_segmentation.py ).

@humbleOldSage
Copy link
Author

humbleOldSage commented Jan 15, 2024

The very first point of improvement I could find is related to the negation of out of order data :

out_of_order_points = loc_df[loc_df.ts.diff() < 0]
if len(out_of_order_points) > 0:
      .
      .
      .
      .
    out_of_order_id_list = out_of_order_points["_id"].tolist()
    logging.debug("out_of_order_id_list = %s" % out_of_order_id_list)
    for ooid in out_of_order_id_list:
        ts.invalidate_raw_entry(ooid)

where invalidate_raw_entry is as below :

    def invalidate_raw_entry(self, obj_id):
        self.timeseries_db.update_one({"_id": obj_id, "user_id": self.user_id}, {"$set": {"invalid": True}})

This is done one ID at a time, which can be inefficient, especially with large list.
We can improve this as below :

  1. Create list of update operations . somethign like below
update_operations = [
    UpdateOne({"_id": obj_id, "user_id": self.user_id}, {"$set": {"invalid": True}})
    for obj_id in out_of_order_id_list
]

and then use bulk_write method to execute all updates at once.


if update_operations:
    self.timeseries_db.bulk_write(update_operations)

@humbleOldSage
Copy link
Author

humbleOldSage commented Jan 16, 2024

To handle too big of a list, we can make batches and then create bulk operations on those batches.

@humbleOldSage
Copy link
Author

To test the above, I flipped the loc_df dataframe as

 loc_df=loc_df[::-1]

thus allowing us to test the out_of_order part of the code.

Another thing I tried was experimenting with the scale of data by appending the same dataframe to the end of itself as :

 for _ in range(n_fold):
    loc_df=pd.concat([loc_df,loc_df],ignore_index=True)

where n_fold means the no_of_times you want to upscale the dataframe. After this I reversed the dataframe as above.

Results were as below :

Trips considered as out_of_order -> 327 327*5
Time taken by Old implementation 0.363 12.101
Time taken by New Implementation 0.037 0.777

@humbleOldSage
Copy link
Author

I did try the batch method( with bulk_write ) as well, using different batch sizes ( 500,1000,2000) but there no improvement as compared to bulk_write.

@humbleOldSage
Copy link
Author

PR : e-mission/e-mission-server#953

@humbleOldSage
Copy link
Author

humbleOldSage commented Jan 17, 2024

there's two other dependency for invalidate_raw_entry that we'll have to handle after this update:

  1. from dwell_segmentaion_dist_filter
  2. from abstract_timeseries.py
 grep -rl invalidate_raw_entry | grep -v __pycache__
./emission/analysis/intake/segmentation/trip_segmentation.py
./emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py
./emission/storage/timeseries/abstract_timeseries.py
./emission/storage/timeseries/builtin_timeseries.py

@shankari
Copy link
Contributor

  1. For the invalid entries improvement, I don't disagree but I am not sure that it is the biggest bang for the buck. Invalid entries are very rare, and this is a corner case that we handle to avoid errors. Before working on that any further, can you estimate how many invalid entries we actually have in our datasets to determine whether to prioritize this.
  2. To clarify again, you tested the improvement with this, but you did it on fake data. There is no evidence that this will actually improve real-world performance.
  3. For performance improvements on existing/running systems, you don't start with looking through the code to find something that you can understand and optimize. Instead, you start with looking at the system logs to understand where the time is being spent, and then you look at the code related to that and then you optimize it.

For example, in the issue here: #950
we did not start by looking at the code and determining what incremental improvement we should do. Instead, we looked at the logs and determined, through timestamps, where the time was being spent, and then determined why and then determined how to fix it.

@humbleOldSage
Copy link
Author

humbleOldSage commented Jan 22, 2024

Looking into the Android run time for now.
It took ~6.8s to run the entire segment_current_trips from testSegmentationWrapperAndroid from TestTripSegmentation.py, out of which ~6.775s is to run the segment_into_trips from dwell_segmentation_filter.py.

Out of this, almost all the time ( ~ 6.774) is to run the for loop (335 iterations)

.
.
 for idx, row in filtered_points_df.iterrows():
.
.

Inside the loop, for each iteration run it took

~0.0043s to run from beginning of the loop :

currPoint = ad.AttrDict(row)
 .
 .
 .

till

.
.
last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1)

AND another ~ 0.011 from

.
.
.
if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes,transition_df):

till rest of the loops,i.e., till

            else:
                prevPoint = currPoint

Meaning that the second part of the loop is currently taking the most time.

@humbleOldSage
Copy link
Author

In this part, the has_trip_ended is the almost the major contributor, inside which
is_tracking_restarted_in_range took ~0.005s and get_ongoing_m option_in_range took ~0.004s ( This is still inside the for look).

To improve is_tracking_restarted_in_range :

There was this db call

tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts,
                    endTs=end_ts)
transition_df = timeseries.get_data_df("statemachine/transition", tq)

which was run in every iteration. Instead , we can place this upstream outside the for loop in segment_into_trips ( dwell_segmentation_time_filter.py) and get all the trips for user in a df beforerhand. However, there's call to is_tracking_restarted_in_range from further upstream in segment_current_trips ->create_places_and_trips -> found_untracked_period ->_is_tracking_restarted (trip_Segmentation.py ) and so, we'll make db calls from segment_current_trips

Once we have the df, since all the rows are sorted by ts, we can do a log(n) search on start index and end index in every iteration as below .

    transition_df_start_idx=transition_df.ts.searchsorted(start_ts,side='left')    
    transition_df_end_idx=transition_df.ts.searchsorted(end_ts,side='right')
    transition_df_for_current=transition_df.iloc[transition_df_start_idx:transition_df_end_idx]

Similar changes in get_ongoing_motion_in_range where I moved

    tq = estt.TimeQuery(timeType = "data.ts", startTs = start_ts,
                        endTs = end_ts)
    motion_list = list(timeseries.find_entries(["background/motion_activity"], tq))

upstream to extract in a df and replace by :

   motion_df_start_idx=motion_df.ts.searchsorted(start_ts,side='left')    
   motion_df_end_idx=motion_df.ts.searchsorted(end_ts,side='right')
   motion_list=motion_df.iloc[motion_df_start_idx:motion_df_end_idx]

@humbleOldSage
Copy link
Author

humbleOldSage commented Jan 22, 2024

With these changes the runtime can down to ~0.0001 (from .005) for is_tracking_restarted_in_range and ~0.0001 (from ~0.004) for get_ongoing_motion_in_range.

So all in all the second part of the for loop that took ~.011s came down to ~0.0006 s.

In total , the segment_current_trips runtime reduced to ~2.12.

@humbleOldSage
Copy link
Author

e-mission/e-mission-server@1d1b31f handles these changes. This was bound to fail.

@humbleOldSage
Copy link
Author

humbleOldSage commented Jan 22, 2024

And with these changes inside for loop, the first part now takes longer to run ( 0.0043 per iteration as mentioned above as well ).

I tried some pandas improvements which might be overkill ( in which case we can roll this back) which reduced overall runtime from ~2.12 to ~1.5.

For this , started by adding Vectorised implementation in calDistance (emission\core\common.py) :

def calDistance(point1, point2, coordinates=False):
   .
   .
   rest of the code
    .
    .
    .

   if isinstance(point1,np.ndarray) and isinstance(point2,np.ndarray):
       dLat = np.radians(point1[:,1]-point2[:,1])
       dLon = np.radians(point1[:,0]-point2[:,0])
       lat1 = np.radians(point1[:,1])
       lat2 = np.radians(point2[:,1]) 

       a = (np.sin(dLat/2) ** 2) + ((np.sin(dLon/2) ** 2) * np.cos(lat1) * np.cos(lat2))
       c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
       d = earthRadius * c

       return d
    .
    .
    .
    rest of the implementation

and in segment_into_trips, the calculations for last5MinTimesand others are vectorized.

@humbleOldSage
Copy link
Author

humbleOldSage commented Jan 25, 2024

For the iOS wrapper, the necessary changes to support the generalized flow of trip and that brought the iOS runtime to 0.3s.

However, when running the combinedWrapper (iOS+Android), the loc_df db call

    loc_df = ts.get_data_df("background/filtered_location", time_query)

isn't work not when placed upstream in trip_segmentation.py and passed to dwell_segmentation_dist_filter.py and dwell_segmentation_dist_filter.py. It only seems to work correctly when placed separately from dwell_segmentation_dist_filter.py and dwell_segmentation_dist_filter.py.

For now, on dummy data, this additional call is increasing android runtime from 1.5 to ~1.6 and iOS from 0.3 to 0.4.

Lets try to figure out why upstream call isn't working

@humbleOldSage
Copy link
Author

humbleOldSage commented Jan 25, 2024

Also, current CombinedWrapper runtime is 2.1s

@humbleOldSage
Copy link
Author

On the side, after yesterday's PR, I was expecting all the tests to pass.

Figured from the logs that testJackUntrackedTimeMar12 test from /Users/ssaini/Documents/GitHub/e-mission-server/emission/tests/analysisTests/intakeTests/TestPipelineRealData.py was failing after get_ongoing_motion_in_range was shifted from list-based to df-based in restart_checking.py. The dataframe implementation was unable to handle an empty df case. This was fixed accordingly.

humbleOldSage added a commit to humbleOldSage/e-mission-server that referenced this issue Feb 4, 2024
The changes  below  that led to these performance upgrades are investigated in   e-mission/e-mission-docs#1041 . They are :

1.  db calls for transition and motion dataframes are moved upstream from  `is_tracking_restarted_in_range` function  and `get_ongoing_motion_in_range` in
 `restart_checking.py` to `trip_segmentaiton.py`.  The old setting which had multiple db calls ( for each iteration ) now happen once in the improved setting.

2. All the other changes in `trip_segmentation.py` and `dwell_segmentation_dist_filter.py` are just to support the change  in point 1 ( above).

3. in `dwell_segmentation_time_filter.py`,other than the changes to support point 1 ( above), there an additional improvement.  The calculations for `last10PointsDistances` and `last5MinsPoints` are vectorised.  For this,  `calDistance` in `common.py` now supports numpy arrays.
@humbleOldSage
Copy link
Author

humbleOldSage commented Feb 4, 2024

Indwell_segmentation_time_filter.py, for evaluating last10PointsDistance and last5MinsDistances, the calculations are now vectored . To support vectorised calculations , calDistance in common.py now has numpy related changes.

This brought the runtime for this entire loop

 for idx, row in filtered_points_df.iterrows():

from ~ 2s to 1.2s over 327 iterations.

humbleOldSage added a commit to humbleOldSage/e-mission-server that referenced this issue Feb 4, 2024
Instead of invalidating one ooid from the list at a time, use UpdateOne and bulkwrite to
 invalidate entire list . This is supported by findings here  e-mission/e-mission-docs#1041 (comment)
humbleOldSage added a commit to humbleOldSage/e-mission-server that referenced this issue Feb 6, 2024
The changes  below  that led to these performance upgrades are investigated in   e-mission/e-mission-docs#1041 . They are :

1.  db calls for transition and motion dataframes are moved upstream from  `is_tracking_restarted_in_range` function  and `get_ongoing_motion_in_range` in `restart_checking.py` to `trip_segmentaiton.py`.  The old setting which had multiple db calls ( for each iteration ) now happen once in the improved setting.

2. All the other changes in `trip_segmentation.py`,`dwell_segmentation_dist_filter.py` and
 `dwell_segmentation_time_filter.py` are just to support the change  in point 1 ( above).
@shankari
Copy link
Contributor

I think we should revisit this from first principles and then validate that what Satyam came up aligns with what you find.
I would like to handle this in a principled fashion, by:

  • instrumenting to see the bottlenecks
  • fix the bottlenecks
  • show the improvement

we can instrument in two ways:

  • add additional logs
  • we can insert additional entries as server stats (as in emission/pipeline/intake_stage.py)

Pro of the logs: it is quick and easy to see as you are making changes
Pro of the stats: you can see the behavior in production without having to pull logs from cloudwatch

I would suggest additional logs initially so you can fix it and potentially generate some stats around improvements on pulled data and do lots of analysis

and then we deploy to production and then we can run analyses (before and after) on multiple deployments of various sizes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Issues being worked on
Development

No branches or pull requests

2 participants