-
Notifications
You must be signed in to change notification settings - Fork 4
/
utils.py
2710 lines (2206 loc) · 125 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import logging
import pandas as pd
from datetime import datetime, timedelta
from dateutil.parser import parse
from typing import Tuple,Dict, Optional, Any, List, Union
from configobj import ConfigObj, Section
import os
from io import StringIO
import xml.etree.ElementTree as ET
import re
from astropy.io import fits
import struct
import requests
import math
import sys
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut,GeocoderUnavailable
import numpy as np
import warnings
import time
warnings.filterwarnings("ignore")
utils_version = '1.3.11'
'''
#utils_version = '1.1.4'
# Changes:
# Tuesday 9th January 2024
# Ensure all self.headers['IMAGETYP'] are upper case
# Corrected position print of 'imagetype' for DARK frames
# Wednesday 10th January 2024
# Ensure that if all geocoding fails, location_string is set to config['defaults']['SITE']
# Log all geocoding errors returned by geopy
# v1.1.3 11th January 2024
# Deal with WBPP
# Removed duplicate light frames from header, thes are typically found in WBPP directories
# If master calibration files are found drop the corresponding calibration frames an donly use the masters
# 12th January 2024
# Added grouping on object (target after name change) in aggregate_parameters. Function was not counting light frames accurately
# added check for master dark, bias and flats. If found any dark, bias or flats frames are ignored
# v1.1.4 13th January 2024
# Redesign of how code parses data files
# Allow the use of symbolic links to calibration data directories when reading data into the program using the function process_directory
# Removed use of WBPP keyword
# 22nd January 2024
# Bug fix: check for master dark, bias and flatdarks. If found any dark, bias or flatdarks frames are ignored
# 24th January 2024
# Corrected FWHM calculation to fwhm = hfr * imscale*2 from fwhm = hfr * imscale
# Corrected observation sessions to be the number of unique dates in the dates set/2
# Corrected the mean session temparature to be the mean of the just ImageType = lights temperature column
# Corrected random selection of image directory when removing duplicates when processing WBPP directories. Force use of _c.xisf files as they contain the correct header information
# Tested 26 directory structures all passed
##############################################################################################################################################################################
#Saved as version 1.2.0 this is now a candidate release version
# 28th January 2024
# Corrected use of the wrong column name for sensorCooling. Used sensorTemperature when sensorCooling was required by Astrobin
# Corrected case when .xisf that have been processed by Pixinsight can have no header information.This seems to be occour when data is processed after calibration.
# and the WBPP directory is used which contains other images that are post processed. Some do not contain header information
# Corrected error where code was adding the same header to the aggregating data frame twice ( big miss!!)
# 29th January 2024
# Changed logic in site.process_new_location
# 1.0 Ensure that if all geocoding fails, location_string is set to config['defaults']['SITE'] in all cases
# 2.0 Dont geocode again if location_string is already set to config['defaults']['SITE']
# 3.0 If bortle and sqm returned are 0,0 then set them to config['defaults']['BORTLE'] and config['defaults']['SQM']
# 4.0 Log all geocoding errors returned by geopy
# Capture error when self.location_info = self.find_location_by_coords(est, lat_long_pair, 2) returns None
# This should not occur so code needs further investigation added logging to enable this
# 30th January 2024
# Version 1.2.3
# Corrected logic that causes coded to claim sites that had been seen were not seen.
# Improved logging to make it easier to read
# Added meassurement and reporting of session temperature statistics
# Added mesurement and reporting of total number of images processed
# Improved layout of summary report
# 31st January 2024
# Corrected observation session count
# 6th February 2024
# Version 1.2.4
# Corrected observation session count it was not corrected before
# had to leave date obs as date times to get the correct date,
# calculate session statistics in aggergate_parameters then convert to date after session calculations
# Minor formatting changes on summary report to align +ve and -ve temperature values
# 7th February 2024
# Version 1.2.5
# added check for session date calculations, any errors will be logged and code reporst session date information not available
# 7th February 2024
# Version 1.2.8
# dump data frames to .csv file for analysis
# 8th February 2024
# Corrected error in the function observation period where ['name'][0] was used instead of ['name'].iloc[0]
# 9th February 2024
# Version 1.2.10
# Ensured that only master flats and master darks that have the same filters and exposures as the light frames are used
# 12th February 2024
# Release version 1.3.0
# 27th February 2024
# Version 1.3.1
# Modified debugging file dumps, so they occour after the data has been processed not all at the the end
# 29th February 2024
# Version 1.3.2
# Code added to handle FOCUSER and SITENAME SGP-PRO keywords
# Code added to deal with situation where keyword pairs conflict with each other
# 'EXPTIME' and 'EXPOSURE' in the same data frame
# 'LAT-OBS' and 'SITELAT' in the same data frame
# Handles mutiple MasterFlat frames for same filter
# Correctly total MasterFlat frames in summary and astrobin output
# Ensure all calibration frame locations are set equal to the nearest light frame location
# Stops calibration frames generating their own site location
# Version 1.3.3
# 4th March 2024
# Corrected error in aggregate_parameters where if no MASTER frames are present the code would fail
# Version 1.3.4
# 5th March 2024
# Corrected utf-8 encoding errror with logging
# Reset index on group in summerize session such that group['target'].iloc[0] returns the correct value
# formatted time output seconds are now shown to 2dp
# Version 1.3.6
# 16 June 2024
# No changes, but matches the version number of the main package
# Version 1.3.7
# allows the scipt to be called from an image directory and process the images in that directory but allows for calibration directories to be passed as arguments
# eg AstroBinUploadv1_3_7.py "." /home/user/images/calibration
# version 1.3.8
# 28th September 2024
# Allows the processing of LIGHTFRAMES as well as LIGHT frames
# Modification in the process headers function to allow the processing of LIGHTFRAMES as well as LIGHT frames
# version 1.3.9
# 28th September 2024
# Allows the processing of LIGHTFRAMES and Light Frames as well as LIGHT frames
# Modification in the process headers function to allow the processing of LIGHTFRAMES Light Frames as well as LIGHT frames
# version 1.3.10
# 29th September 2024
# Deals with the case where a fractional part of a second is present in some date-obs keyword but not in others
# Deals with the case where the filter names in the light frames have trailing white spaces.
# version 1.3.11
# 16th October 2024
# Fixes bug where running the script for the first time from the installation directory would fail
# Deals with the case where the filter names in the light frames have trailing white spaces.
# Modifed script to take a new default parameter for the config file
# The parameter is USEOBSDATE and is set to True if the actual date of the observation session is to be used when aggregating data
# for the astrobin upload .csv output.
# If this prameter is set to False then the date the observation session was started is used.
# added progress counter to the hfr processing
'''
def initialise_logging(log_filename: str) -> logging.Logger:
"""
Initializes logging for the application.
Initializes logging for the application, by creating a logger object and adding a file
handler to it. The file handler will write log messages to the specified log file.
Parameters:
- log_filename (str): The name of the log file.
Returns:
- logging.Logger: The logger object.
"""
# Clear the log file
with open(log_filename, 'w'):
pass
# Set up logging in the main part of your application
logger = logging.getLogger(__name__)
# Remove all handlers associated with the logger object.
for handler in logger.handlers[:]:
logger.removeHandler(handler)
logger.setLevel(logging.INFO)
#handler = logging.StreamHandler()
handler = logging.FileHandler(log_filename, encoding='utf-8') # Log to a file
formatter = logging.Formatter('%(asctime)s - %(name)s - Line: %(lineno)d - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def format_seconds_to_hms(seconds: int) -> str:
"""
Converts seconds to a string formatted as hours, minutes, and seconds.
This function takes an integer number of seconds and converts it to a string formatted as hours, minutes, and seconds.
If the number of hours or minutes is zero, it is not included in the output string.
Parameters:
- seconds (int): The number of seconds to convert.
Returns:
- str: The formatted string representing the input time in hours, minutes, and seconds.
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = float(seconds % 60)
return "{:>6} {:>6} {:>6}".format(
f"{hours} hrs" if hours > 0 else "",
f"{minutes} mins" if hours > 0 or minutes > 0 else "",
f"{secs:.2f} secs"
)
def seconds_to_hms(seconds: int) ->str:
"""
Converts seconds to a string formatted as hours, minutes, and seconds.
This function takes an integer number of seconds and converts it to a string formatted as hours, minutes, and seconds.
If the number of hours or minutes is zero, it is not included in the output string.
Parameters:
- seconds (int): The number of seconds to convert.
Returns:
- str: The string representing the input time in hours, minutes, and seconds.
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = float(seconds % 60)
return f"{hours} hrs {minutes} mins {secs:.2f} secs"
def process_image_type(group: pd.DataFrame, imagetype: str, logger: logging.Logger) -> Tuple[str, float]:
"""
Processes a specific image type and calculates total exposure time.
This function processes a specific image type ('LIGHT', 'DARK', 'MASTERDARK', 'BIAS', 'MASTERBIAS', 'FLAT', 'MASTERFLAT')
and calculates the total exposure time. It generates a detailed summary for each image type.
Parameters:
- group (pd.DataFrame): The DataFrame containing the grouped data.
- imagetype (str): The type of the image to process.
- logger (logging.Logger): The logger to use for logging messages.
Returns:
- Tuple[str, float]: A tuple containing the detailed summary as a string and the total exposure time as a float.
"""
logger.info(f"Starting to process image type: {imagetype}")
lines = []
image_group = group[group['imageType'] == imagetype]
#total_exposure_time_all_targets = 0 # Add this line
total_exposure_time = 0 # Add this lin
if imagetype in ['LIGHT', 'LIGHTFRAME']:
format_string = " {:<8} {:<8} {:<8} {:<12} {:<12} {:<12} {:<12} {:<15} {:<15}"
lines.append(f" {imagetype}S:")
for target, target_group in image_group.groupby('target'):
total_exposure_time_target = 0
lines.append(f" Target: {target}\n")
lines.append(format_string.format("Filter","Frames", "Gain", "Egain", "Mean FWHM", "Sensor Temp","Mean Temp", "Exposure", "Total Exposure"))
for _, sub_group in target_group.groupby('filter'):
frame_count = sub_group['number'].sum()
exposure = sub_group['duration'].iloc[0].astype(float).round(2)
gain = f"{(sub_group['gain'].iloc[0]*.1).astype(float).round(2)} dB"
egain =f"{sub_group['egain'].iloc[0].astype(float).round(2)} e/ADU"
formatted_exposure = f"{exposure:.2f} secs" # Format for two decimal places
total_exposure = frame_count * exposure
total_exposure_time_target += total_exposure
mean_fwhm = sub_group['meanFwhm'].mean().round(2)
mean_temperature = f"{sub_group['temperature'].mean().round(1)}\u00B0C"
sensor_temperature = f"{sub_group['sensorCooling'].mean().round(1)}\u00B0C"
# Add a space before the number for positive temperatures
mean_temperature = f" {mean_temperature}" if mean_temperature[0] != '-' else mean_temperature
sensor_temperature = f" {sensor_temperature}" if sensor_temperature[0] != '-' else sensor_temperature
formatted_mean_fwhm = f"{mean_fwhm:.2f} arcsec" # Format for two decimal places
lines.append(format_string.format(sub_group['filter'].iloc[0],frame_count, gain,egain,formatted_mean_fwhm, sensor_temperature, mean_temperature, formatted_exposure , format_seconds_to_hms(total_exposure)))
logger.info(f"Processed {frame_count} frames for filter {sub_group['filter'].iloc[0]} with total exposure time {seconds_to_hms(total_exposure)}")
lines.append(f"\n Exposure time for {target}: {seconds_to_hms(total_exposure_time_target)}\n")
total_exposure_time+= total_exposure_time_target # Add this line
elif imagetype in ['DARK', 'MASTERDARK', 'BIAS', 'MASTERBIAS']:
if imagetype == 'DARK' or imagetype == 'MASTERDARK' :
detail_format = " {:<10} {:<8} {:<8} {:<12} {:<12} {:<12} {:<15}"
lines.append(f"\n {imagetype}S:\n")
lines.append(detail_format.format("Filter", "Frames", "Gain", "Egain","Exposure","Sensor Temp", "Total Exposure"))
for gain, sub_group in image_group.groupby('gain'):
frame_count = sub_group['number'].sum()
exposure = sub_group['duration'].iloc[0].astype(float).round(2)
formatted_exposure = f"{exposure:.2f} secs" # Format for two decimal places
total_exposure = frame_count * exposure
total_exposure_time += total_exposure
fgain = f"{gain*.1} dB"
egain =f"{sub_group['egain'].iloc[0].astype(float).round(2)} e/ADU"
sensor_temperature = f"{sub_group['sensorCooling'].mean().round(1)}\u00B0C"
filter_name = 'N/A' if 'filter' not in sub_group else sub_group['filter'].iloc[0]
lines.append(detail_format.format(filter_name, frame_count, fgain, egain,formatted_exposure,sensor_temperature, format_seconds_to_hms(total_exposure)))
logger.info(f"Processed {frame_count} frames for gain {fgain} with total exposure time {seconds_to_hms(total_exposure)}")
else:
lines.append(f"\n {imagetype}ES:\n")
detail_format = " {:<10} {:<8} {:8} {:<12} {:<10} {:<15}"
lines.append(detail_format.format("Filter", "Frames","Gain", "Egain", "Exposure", "Total Exposure"))
for gain, sub_group in image_group.groupby('gain'):
frame_count = sub_group['number'].sum()
exposure = sub_group['duration'].iloc[0].astype(float).round(2)
formatted_exposure = f"{exposure:.2f} secs" # Format for two decimal places
total_exposure = frame_count * exposure
total_exposure_time += total_exposure
fgain = f"{gain*.1} dB"
egain =f"{sub_group['egain'].iloc[0].astype(float).round(2)} e/ADU"
filter_name = 'N/A' if 'filter' not in sub_group else sub_group['filter'].iloc[0]
lines.append(detail_format.format(filter_name, frame_count, fgain, egain, formatted_exposure, format_seconds_to_hms(total_exposure)))
logger.info(f"Processed {frame_count} frames for gain {fgain} with total exposure time {seconds_to_hms(total_exposure)}")
elif imagetype in ['FLAT', 'MASTERFLAT']:
detail_format = " {:<10} {:<8} {:<10} {:<15} {:<12} {:<15}"
lines.append(f"\n {imagetype}S:\n")
#lines.append("Filter Frames Exposure Total Exposure ")
lines.append(detail_format.format("Filter", "Frames","Gain", "Egain", "Exposure", "Total Exposure"))
for _, sub_group in image_group.groupby('filter'):
frame_count = sub_group['number'].sum()
exposure = sub_group['duration'].iloc[0].astype(float).round(2)
gain = f"{(sub_group['gain'].mean()*.1).round(2)} dB"
egain =f"{sub_group['egain'].iloc[0].astype(float).round(2)} e/ADU"
formatted_exposure = f"{exposure:.2f} secs" # Format for two decimal places
total_exposure = frame_count * exposure
total_exposure_time += total_exposure
lines.append(detail_format.format(sub_group['filter'].iloc[0], frame_count, gain,egain,formatted_exposure, format_seconds_to_hms(total_exposure)))
logger.info(f"Processed {frame_count} frames for filter {sub_group['filter'].iloc[0]} with total exposure time {seconds_to_hms(total_exposure)}")
return "\n".join(lines), total_exposure_time
def site_details(group: pd.DataFrame, site: str, logger: logging.Logger) -> str:
"""
Generates a summary of site details.
This function generates a summary of the site details based on the DataFrame provided.
It includes the site name, latitude, longitude, mean temperature, Bortle scale, and SQM.
Parameters:
- group (pd.DataFrame): The DataFrame containing the grouped data.
- site (str): The name of the site.
- logger (logging.Logger): The logger to use for logging messages.
Returns:
- str: The site details.
"""
logger.info("Generating site details summary")
details = f"\n Site:\t{site}\n"
logger.info(f" Site: {site}")
details += f"\tLatitude: {group['sitelat'].iloc[0]}\u00B0, Longitude: {group['sitelong'].iloc[0]}\u00B0\n"
logger.info(f"Latitude: {group['sitelat'].iloc[0]}\u00B0, Longitude: {group['sitelong'].iloc[0]}\u00B0")
details += f"\tBortle scale: {group['bortle'].iloc[0]}, SQM: {group['meanSqm'].iloc[0]} mag/arcsec\u00B2\n"
logger.info(f"Bortle scale: {group['bortle'].iloc[0]}, SQM: {group['meanSqm'].iloc[0]} mag/arcsec\u00B2")
return details
def equipment_used(group: pd.DataFrame, df: pd.DataFrame, logger: logging.Logger) -> str:
"""
Determines the equipment used.
This function generates a summary of the equipment used based on the 'LIGHT' frames in the DataFrame.
It also captures the software used from both 'LIGHT' and 'MASTER' frames.
Parameters:
- group (pd.DataFrame): The DataFrame containing the grouped data.
- df (pd.DataFrame): The DataFrame containing the observation data.
- logger (logging.Logger): The logger to use for logging messages.
Returns:
- str: The equipment used.
"""
logger.info("Determining equipment used")
equipment_format = " {:<20}: {}\n"
equipment = "\n Equipment used:\n"
eq_group = group
# Add equipment details
if eq_group['telescope'].iloc[0] != 'None':
equipment += equipment_format.format("Telescope", eq_group['telescope'].iloc[0])
if eq_group['camera'].iloc[0] != 'None':
equipment += equipment_format.format("Camera", eq_group['camera'].iloc[0])
if eq_group['filterWheel'].iloc[0] != 'None':
equipment += equipment_format.format("Filterwheel", eq_group['filterWheel'].iloc[0])
if eq_group['focuser'].iloc[0] != 'None':
equipment += equipment_format.format("Focuser" , eq_group['focuser'].iloc[0])
if eq_group['rotator'].iloc[0] != 'None':
equipment += equipment_format.format("Rotator" , eq_group['rotator'].iloc[0])
logger.info(
f"Equipment details: Telescope: {eq_group['telescope'].iloc[0]}, "
f"Camera: {eq_group['camera'].iloc[0]}, Filterwheel: {eq_group['filterWheel'].iloc[0]}, "
f"Focuser: {eq_group['focuser'].iloc[0]}, Rotator: {eq_group['rotator'].iloc[0]}"
)
# Capture software from LIGHT and MASTER frames
software_set = set(eq_group['swcreate'].unique())
master_types = ['MASTERFLAT', 'MASTERDARKFLAT', 'MASTERBIAS', 'MASTERDARK']
for master_type in master_types:
master_group = df[df['imageType'] == master_type]
if not master_group.empty:
software_set.update(master_group['swcreate'].unique())
# Add software details
software_list = list(software_set)
equipment += equipment_format.format("Capture software", software_list.pop(0))
for software in software_list:
equipment += equipment_format.format("", software)
logger.info(f"Capture software: {software_set}")
return equipment
def target_details(group: pd.DataFrame, logger: logging.Logger) -> str:
"""
Determines the target details.
This function calculates the target details based on the 'LIGHT' frames in the DataFrame.
It normalizes the target names, finds unique targets, and checks if any target is a panel.
If a panel is found, it formats the target name accordingly.
Parameters:
- group (pd.DataFrame): The DataFrame containing the grouped data.
- logger (logging.Logger): The logger to use for logging messages.
Returns:
- str: The target details.
"""
logger.info("Determining target details")
target_format = " {:<6} {}"
#target_group = group[group['imageType'] == 'LIGHT'].reset_index(drop=True)
#logger.info(f"Target group: {group}")
# Get the 'target' column values
arr = pd.Series(group['target'].astype(str))
logger.info(f"Target value: {arr[0]}")
# Normalize the strings: lowercase, remove underscores, and collapse multiple spaces to a single space
normalized = arr.str.replace('_', ' ').str.lower().str.split().str.join(' ')
#logger.info(f"Normalized targets: {normalized}")
# Create a mapping from normalized strings to original strings
mapping = pd.Series(arr.values, index=normalized).to_dict()
logger.info(f"Mapping: {mapping}")
# Get unique values
unique_values = pd.Series(list(mapping.keys())).unique()
logger.info(f"Unique values: {unique_values}")
# Map unique values back to original strings
targets = [mapping[val] for val in unique_values]
logger.info(f"Unique targets: {targets}")
# Check if 'Panel' is in each string
panels = [s for s in targets if 'Panel' in s]
if panels:
# Split the first string at 'Panel' and take the first part
target_name = panels[0].split('Panel')[0].strip()
# Create the final string
target = target_format.format("Target:", f"{target_name} {len(panels)} Panel Mosaic")
logger.info(f"Target is a panel: {target}")
else:
target = target_format.format("Target:", targets[0])
logger.info(f"Target: {target}")
return target
def observation_period(group_in: pd.DataFrame, logger: logging.Logger) -> str:
group = group_in.copy(deep=True)
logger.info("Determining observation period")
period_format = " {:<25}: {}\n"
period = "\n Observation period: \n"
error_date = pd.to_datetime('1900-01-01')
if 'imageType' not in group.columns:
logger.error(" imageType is not a column in the group DataFrame.")
logger.info("Output: Session date information not available")
period += " Session date information not available\n"
return period
if (group['imageType'] != 'LIGHT').all():
logger.info("No LIGHT frames available")
period += " No LIGHT frames available\n"
period += " Session date information not available\n"
return period
light_group = group[group['imageType'] == 'LIGHT']
if not light_group.empty:
start_date = light_group['start_date'].iloc[0]
end_date = light_group['end_date'].iloc[0]
num_days = light_group['num_days'].iloc[0]
sessions= light_group['sessions'].iloc[0]
if start_date == error_date or end_date == error_date or num_days == 0 or sessions == 0:
period += " Session date information not available\n"
logger.error("No valid date information available for observation period.")
logger.info("Output: Session date information not available")
else:
period += period_format.format("Start date", start_date)
period += period_format.format("End date", end_date)
period += period_format.format(f"Days",num_days)
period += period_format.format("Observation sessions", sessions)
logger.info(f"Start date: {start_date}")
logger.info(f"End date: {end_date}")
logger.info(f"Session length: {num_days}")
logger.info(f"Number of sessions: {sessions}")
period += period_format.format("Min temperature", f"{light_group['temp_min'].min().round(1)}\u00B0C")
period += period_format.format("Max temperature", f"{light_group['temp_max'].max().round(1)}\u00B0C")
period += period_format.format("Mean temperature", f"{light_group['temperature'].mean().round(1)}\u00B0C")
logger.info(f"Min temperature: {light_group['temp_min'].min().round(1)}\u00B0C")
logger.info(f"Max temperature: {light_group['temp_max'].max().round(1)}\u00B0C")
logger.info(f"Mean temperature: {light_group['temperature'].mean().round(1)}\u00B0C")
return period
else:
logger.info("No LIGHT frames available")
return "\nObservation period: \n\tNo LIGHT frames available\n"
def get_number_of_images(number_of_images: int, logger: logging.Logger) -> str:
logger.info("Determining number of images processed")
images = number_of_images
logger.info(f"Total number of images processed: {images}")
number_of_images_format = " {:<25}: {}\n"
return number_of_images_format.format("Total number of images processed",images)
def summarize_session(df: pd.DataFrame, logger: logging.Logger, number_of_images: int) -> str:
"""
Summarizes an observation session.
This function generates a summary of an observation session, including details about the target,
site, equipment used, observation period, and exposure time for each image type.
Parameters:
- df (pd.DataFrame): The DataFrame containing the observation data.
- logger (logging.Logger): The logger to use for logging messages.
Returns:
- str: The summary of the observation session.
"""
logger.info("")
logger.info("GENERATING OBSERVATION SESSION SUMMARY")
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
imagetype_order = ['LIGHT', 'FLAT', 'MASTERFLAT', 'DARKFLAT', 'BIAS', 'DARK', 'MASTERDARKFLAT', 'MASTERBIAS', 'MASTERDARK']
summary_parts = [f"\n Observation session summary\n Generated {current_time}\n"]
logger.info("Added initial summary part")
logger.info("")
for site, group in df.groupby('site'):
target_group = group[group['imageType'] == 'LIGHT'].reset_index(drop=True)
if not target_group.empty:
logger.info(f"Processing site: {site}")
logger.info("")
summary_parts.append(target_details(target_group ,logger))
logger.info("Added target details to summary")
logger.info("")
target = site_details(target_group , site,logger)
if target != 'No target':
summary_parts.append(target)
logger.info("Added site details to summary")
logger.info("")
summary_parts.append(equipment_used(target_group ,df,logger))
logger.info("Added equipment used to summary")
logger.info("")
summary_parts.append(observation_period(target_group ,logger))
logger.info("Added observation period to summary")
logger.info("")
#for site, group in df.groupby('site'):
for imagetype in imagetype_order:
if imagetype in group['imageType'].unique():
logger.info(f"Processing image type: {imagetype}")
summary, total_exposure_time = process_image_type(group, imagetype,logger)
summary_parts.append(summary)
if imagetype == 'LIGHT':
summary_parts.append(f" Total {imagetype} Exposure Time: {format_seconds_to_hms(total_exposure_time)}\n")
else:
summary_parts.append(f"\n Total {imagetype} Exposure Time: {seconds_to_hms(total_exposure_time)}\n")
logger.info(f"Finished processing image type: {imagetype}. Total exposure time: {seconds_to_hms(total_exposure_time)}")
logger.info(f"Added image type {imagetype} to summary")
logger.info("")
logger.info("*"*200)
logger.info("")
summary_parts.append(get_number_of_images(number_of_images,logger))
logger.info("Added number of images to summary")
logger.info("")
logger.info("Observation session summary generated")
return "\n".join(summary_parts)
class Configuration:
@staticmethod
def cast_value(value: Any) -> Any:
"""
Casts the input value to an appropriate type.
This function attempts to cast the input value to a list, float, or int. If the input value is already a list,
it recursively applies the casting to each item in the list. If the input value is a string that contains a comma,
it splits the string at the comma and treats it as a list, recursively applying the casting to each item. If the
input value is a string that contains a period, it attempts to cast it to a float. If the input value is a string
that does not contain a period, it attempts to cast it to an int. If all casting attempts fail, it returns the
input value as is.
Parameters:
value (Any): The input value to be cast. This can be of any type.
Returns:
Any: The cast value. This can be a list, float, int, or string, depending on the input.
"""
if isinstance(value, list): # Check if value is already a list
return [Configuration.cast_value(item) for item in value]
if ',' in str(value): # Check if value is a list
return [Configuration.cast_value(item) for item in str(value).split(',')]
try:
if '.' in str(value): # Check if value can be a float
return float(value)
else:
return int(value)
except ValueError:
return value # return as is, if it's a string
def __init__(self, config_filename,logger=None):
self.filename = config_filename
# ... ini_string definition ...
self.ini_string= """
[defaults]
#FITS keyword Default value
IMAGETYP = LIGHT
EXPOSURE = 0.0
DATE-OBS = 2023-01-01
XBINNING = 1
GAIN = -1
EGAIN = -1
INSTRUME = None
TELESCOP = None
FOCNAME = None
FWHEEL = None
ROTATOR = None
XPIXSZ = 3.76
CCD-TEMP = -10
FOCALLEN = 540
FOCRATIO = 5.4
SITE = Papworth Everard
SITELAT = 52.2484
SITELONG = -0.1231
BORTLE = 4
SQM = 20.5
FILTER = No Filter
OBJECT = No target
FOCTEMP = 20
HFR = 1.6
FWHM = 0
SWCREATE = Unknown package
USEOBSDATE = TRUE
[filters]
#Filter code
Ha = 4663
SII = 4844
OIII = 4752
Red = 4649
Green = 4643
Blue = 4637
Lum = 2906
[secret]
#API key API endpoint
xxxxxxxxxxxxxxxx = https://www.lightpollutionmap.info/QueryRaster/
EMAIL_ADDRESS = [email protected]
[sites]
"""
self.logger = logger
self.change = self.initialise_config()
def cast_section(self, section: Dict[Any, Any]) -> None:
"""
Recursively casts the values in a section dictionary to appropriate types.
This function iterates over each key-value pair in the section dictionary. If a value is a dictionary, it
recursively applies the casting to the nested dictionary. If a value is not a dictionary, it attempts to cast
the value to an appropriate type using the `cast_value` method.
Parameters:
section (Dict[Any, Any]): The section dictionary where the casting should be applied. The dictionary can have
keys and values of any type.
Returns:
None
"""
for key, value in section.items():
if isinstance(value, dict):
self.cast_section(value)
else:
section[key] = self.cast_value(value)
def initialise_config(self):
'''
Initialise the configuration file
If the configuration file doesn't exist, create it with the ini_string
If it does exist, read it into the config object
Parameters
----------
filename : str
The name of the configuration file
Returns
-------
None
'''
self.logger.info("")
self.logger.info("INITIALISING CONFIGURATION FILE")
change = False
if not os.path.exists(self.filename):
self.logger.info('No configuration file found')
self.logger.info('Creating configuration file: %s', self.filename)
self.config = ConfigObj(StringIO(self.ini_string),encoding='utf-8')
self.config.filename = self.filename
self.config.write()
change=True
else:
#read configuration file into config object
self.config = ConfigObj(self.filename,encoding='utf-8')
self.logger.info('Configuration file found')
#correct any problems with the config file
self.correct_config()
# Cast all values to their appropriate types
self.cast_section(self.config)
self.logger.info('Read Config object : %s', self.get_config())
return change
def update_config(self, section: str, subsection: Optional[str], keys_values: Dict[str, Any]) -> bool:
"""
Updates the configuration with a new section, subsection, and key-value pairs.
This function checks if the section exists in the configuration. If not, it creates it. If the subsection is not
provided, it creates a new one with the name 'sitex', where x is the number of existing subsections in the section
plus one. It then checks if the subsection exists in the section. If not, it creates it and adds the provided
key-value pairs. If any changes are made, it saves the updated configuration to the file.
Parameters:
section (str): The name of the section to be updated.
subsection (Optional[str]): The name of the subsection to be updated. If not provided, a new subsection is created.
keys_values (Dict[str, Any]): The key-value pairs to be added to the subsection.
Returns:
bool: True if any changes were made to the configuration, False otherwise.
"""
self.logger.info("")
self.logger.info("UPDATING CONFIGURATION FILE")
def strip_leading_spaces(multiline_string):
lines = multiline_string.split('\n')
stripped_lines = [line.lstrip() for line in lines]
return '\n'.join(stripped_lines)
change = False
# Check if the section exists, if not create it
if section not in self.config:
self.logger.info('Adding %s section', section)
self.config[section] = {}
change = True
# If subsection is empty, set it to 'sitex'
if not subsection:
subsection = 'site' + str(len(self.config[section]) + 1)
self.logger.info('No subsection passed adding %s sub-section', subsection)
change = True
# Add a blank line above the section and a comment below the section name
self.config.comments[section] = ['']
# Check if the subsection exists, if not create it and add keys and values
if subsection not in self.config[section]:
self.config[section][subsection] = keys_values
change = True
self.logger.info('%s not found in %s, adding it', subsection, section)
self.logger.info('Adding key values pairs %s', keys_values)
# Save the changes to the file
if change:
self.cast_section(self.config)
self.config.write()
self.logger.info('Updated %s', self.filename)
self.logger.info('Updated Config object: %s', dict(self.config))
else:
self.logger.info('No changes made to %s', self.filename)
return change
def get_config(self) -> dict:
"""
Returns a string representation of the configuration.
This function iterates over each section in the configuration. If a value is a Section, it recursively prints
the types of the nested section. If a value is not a Section, it adds the key, value, and type of the value to
the string. The string is indented to reflect the structure of the configuration.
Returns:
str: A string representation of the configuration, with keys, values, and types of values.
"""
self.logger.info("")
self.logger.info("GETTING CONFIGURATION FILE")
return dict(self.config)
def read_config(self):
return dict(ConfigObj(self.filename,encoding='utf-8'))
def correct_config(self):
"""
This method corrects the configuration by normalizing section names,
reconstructing the configuration in a specific order, ensuring default keys order and existence,
processing the 'sites' section, and finally saving the configuration to the config.ini file.
"""
self.logger.info("Starting to correct the configuration.")
# Parse self.ini_string into a ConfigObj object
ini_string_config = ConfigObj(self.ini_string.splitlines())
self.logger.info("Parsed ini_string into a ConfigObj object.")
# Normalize section names in ini_string_config
self._normalize_section_names(ini_string_config)
self.logger.info("Normalized section names in ini_string_config.")
# Step 1: Normalize section names in self.config
self._normalize_section_names(self.config)
self.logger.info("Normalized section names in self.config.")
# Step 2: Reconstruct self.config in the order of ini_string_config
self._reconstruct_config(ini_string_config)
self.logger.info("Reconstructed self.config in the order of ini_string_config.")
# Step 3: In the 'defaults' section, convert keys to upper case and remove whitespaces
if 'defaults' in self.config:
self.config['defaults'] = {key.upper().replace(' ', ''): value for key, value in self.config['defaults'].items()}
self.logger.info("Converted keys in the 'defaults' section to upper case and removed whitespaces.")
# Step 4, 5: Ensure default keys order and existence
if 'defaults' in ini_string_config:
self._ensure_default_keys_order(self.config['defaults'], ini_string_config['defaults'])
self.logger.info("Ensured default keys order and existence.")
# Additional processing for 'sites' section
self._process_sites_section()
self.logger.info("Processed 'sites' section.")
# Step 6: Save self.config to the config.ini file
self.config.write()
self.logger.info("Saved self.config to the config.ini file.")
self.logger.info("Finished correcting the configuration.")
def _normalize_section_names(self, section):
"""
This method normalizes the names of the sections in the provided section object.
It iterates over the keys of the section, and if the key corresponds to a Section instance,
it normalizes the key by converting it to lower case and removing spaces, and then updates the section with the normalized key.
"""
self.logger.info("Starting to normalize section names.")
for key in list(section.keys()):
if isinstance(section[key], Section):
normalized_key = key.lower().replace(' ', '')
section[normalized_key] = section.pop(key)
self.logger.info(f"Normalized section name: {normalized_key}")
self.logger.info("Finished normalizing section names.")
def _reconstruct_config(self, reference_config):
"""
This method reconstructs the configuration based on a reference configuration.
It creates a new ConfigObj, then iterates over the sections in the reference configuration.
If a section exists in the current configuration, it is copied to the new configuration.
If not, the section from the reference configuration is used.
Finally, the current configuration is cleared and updated with the new configuration.
"""
self.logger.info("Starting to reconstruct the configuration.")
new_config = ConfigObj()
for section in reference_config:
if section in self.config:
new_config[section] = self.config[section]
self.logger.info(f"Copied section {section} from the current configuration.")
else:
new_config[section] = reference_config[section]
self.logger.info(f"Copied section {section} from the reference configuration.")
self.config.clear()
self.logger.info("Cleared the current configuration.")
self.config.update(new_config)
self.logger.info("Updated the current configuration with the new configuration.")
self.logger.info("Finished reconstructing the configuration.")
def _process_sites_section(self):
"""
This method processes the 'sites' section of the configuration.
It strips leading and trailing spaces from subsection names, normalizes keys in subsections,
and ensures the presence and order of specific keys.
"""
self.logger.info("Starting to process 'sites' section.")
ini_string_config = ConfigObj(self.ini_string.splitlines())
if 'sites' in self.config:
for subsection in list(self.config['sites'].keys()):
# Only strip leading and trailing spaces from subsection names
stripped_subsection = subsection.strip()
if stripped_subsection != subsection:
self.config['sites'][stripped_subsection] = self.config['sites'].pop(subsection)
self.logger.info(f"Stripped spaces from subsection name: {stripped_subsection}")
else:
stripped_subsection = subsection
# Step 2: Normalize keys in subsections
self.config['sites'][stripped_subsection] = {
key.lower().replace(' ', ''): value
for key, value in self.config['sites'][stripped_subsection].items()
}
self.logger.info(f"Normalized keys in subsection: {stripped_subsection}")
# Step 3 and 4: Ensure the presence and order of specific keys
required_keys = ['latitude', 'longitude', 'bortle', 'sqm']
default_values = {
'latitude': ini_string_config['defaults'].get('SITELAT', ''),
'longitude': ini_string_config['defaults'].get('SITELONG', ''),
'bortle': ini_string_config['defaults'].get('BORTLE', ''),
'sqm': ini_string_config['defaults'].get('SQM', '')
}
# Create a new ordered dictionary
new_dict = {}
for key in required_keys:
if key in self.config['sites'][stripped_subsection]:
new_dict[key] = self.config['sites'][stripped_subsection][key]
else:
new_dict[key] = default_values[key]
# Add any remaining keys from the original dictionary
for key, value in self.config['sites'][stripped_subsection].items():
if key not in new_dict:
new_dict[key] = value
# Replace the old dictionary with the new one
self.config['sites'][stripped_subsection] = new_dict
self.logger.info(f"Updated subsection: {stripped_subsection} with new dictionary.")
self.logger.info("Finished processing 'sites' section.")
def _ensure_default_keys_order(self, defaults_section, reference_defaults):
"""
This method ensures the order and presence of keys in the 'defaults' section of the configuration.
It creates an ordered dictionary, then iterates over the keys in the reference defaults.
If a key exists in the defaults section, it is copied to the ordered dictionary.
If not, the key from the reference defaults is used.
Finally, the defaults section is cleared and updated with the ordered dictionary.
"""
self.logger.info("Starting to ensure default keys order.")