-
Notifications
You must be signed in to change notification settings - Fork 2
/
mcInterface.py
1623 lines (1434 loc) · 58.8 KB
/
mcInterface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Version 1.18
"""an interface module to allow read/write access to Minecraft save files
"""
# Read and write are implemented now.
# Works with the Minecraft 1.13 and later file format
#
# Paul Spooner
# www.peripheralarbor.com
# Tell minecraft to re-calc the lighting when you change something
# True or False
RELIGHT = True
##############################################################
# Don't edit below here unless you know what you are doing #
##############################################################
import gzip
import time
import zlib
from struct import pack, unpack
COORD_MAP = {'x': 0, 'y': 1, 'z': 2}
ROT_MAP = {'yaw': 0, 'pitch': 1}
def raw_readout(raw_data, cap=144, start=0):
"""Print the raw data of the string, for debugging purposes.
Includes line numbers, character print, and character value.
raw_data : the string to read
cap : the maximum number of characters to display, defaults to 144
start : the starting offset, defaults to 0 (beginning)"""
loc = 0
for val in raw_data[start:start + cap]:
print(str(loc + start) + " " + chr(val) + " " + str(val))
loc += 1
class NbtTagBase(object):
"""This is the base tag, used as a base class for other tags."""
# the number of bytes to read in for the payload
payload_length = 0
# used for decoding, this is the struct.unpack code
payload_type = ''
# used for encoding, this is the NBT tag id number, should be an int
tag_type = 0
def getName(self):
"""read in the name of the tag"""
# get the byte code for the string length
raw_length = self.data_ob.get_data(2)
# decode the bytes into an integer, representing the length of the name
name_length = unpack('>h', raw_length)[0]
# retrieve the bytecode for the name
name = self.data_ob.get_data(name_length)
# decode into a string, and store
self.name = str(name, 'utf_8')
return
def encodeName(self):
"""output the length and name, encoded in NBT format"""
# get the encoded name
raw_name = bytes(self.name, 'utf_8')
# get the length of the name
length = len(raw_name)
# encode the length
output = pack('>h', length)
# encode the name
output += raw_name
return output
def getPayload(self):
"""read in the payload of the tag."""
# read in the number of bytes indicated in payload_length
payload_raw = self.data_ob.get_data(self.payload_length)
# decode the bytes assuming the payload_type format, store in payload
self.payload = unpack(self.payload_type, payload_raw)[0]
return
def encodePayload(self):
"""output the payload, encoded in NBT format"""
raw_payload = pack(self.payload_type, self.payload)
return raw_payload
def __init__(self, data_ob, named=True):
"""Read in the data for this tag.
data_ob : the NbtData object which stores the methods
to access the raw data. Strange, but it works.
named : bool, if True this tag will import a name, otherwise not
"""
# store the parent scheme thingy
self.data_ob = data_ob
# if this is a named tag, read in the name
if named:
self.getName()
self.named = True
# otherwise, set to an empty string
else:
self.name = ''
self.named = False
# read in the payload
self.getPayload()
def __str__(self):
"""Return a nice string representing the tag contents."""
output = self.name + ": " + str(self.payload)
return output
def encode(self, tagged=True):
"""Return a byte string containing the encoded contents."""
# initialize the byte string with the identifier byte
if tagged:
output = pack('>B', self.tag_type)
else:
# unless the tag isn't tagged (lists)
output = b''
# if the tag is named, output the name
if self.named:
output += self.encodeName()
# finally, output the payload data
output += self.encodePayload()
return output
class NbtTag0(NbtTagBase):
"""TAG_End"""
def __init__(self, data_ob, named=False):
# store the parent scheme thingy
self.data_ob = data_ob
self.name = ''
self.named = named
self.payload = ''
def encode(self, tagged=True):
output = pack('>B', 0)
return output
class NbtTag1(NbtTagBase):
"""TAG_Byte"""
payload_length = 1
payload_type = '>b'
tag_type = 1
class NbtTag2(NbtTagBase):
"""TAG_Short"""
payload_length = 2
payload_type = '>h'
tag_type = 2
class NbtTag3(NbtTagBase):
"""TAG_Int"""
payload_length = 4
payload_type = '>i'
tag_type = 3
class NbtTag4(NbtTagBase):
"""TAG_Long"""
payload_length = 8
payload_type = '>q'
tag_type = 4
class NbtTag5(NbtTagBase):
"""TAG_Float"""
payload_length = 4
payload_type = '>f'
tag_type = 5
class NbtTag6(NbtTagBase):
"""TAG_Double"""
payload_length = 8
payload_type = '>d'
tag_type = 6
class NbtTag7(NbtTagBase):
"""TAG_Byte_Array,
also used as a base class for TAG_String and TAG_Int_Array"""
tag_type = 7
Element_Byte_Size = 1
payload_type = '>B'
def get_payload_length(self):
"""get the length of the payload data"""
# the byte array length is four bytes long
raw_payload_length = self.data_ob.get_data(4)
# interpret as an unsigned int (why would length be negative?)
self.payload_length = unpack('>I', raw_payload_length)[0] * self.Element_Byte_Size
def encode_payload_length(self):
"""encode the payload length"""
length = len(self.payload) # * self.Element_Byte_Size
raw_length = pack('>I', length)
return raw_length
def getPayload(self):
"""read in the payload of the byte array."""
EBS = self.Element_Byte_Size
# read in the raw data
payload_raw = self.data_ob.get_data(self.payload_length)
# this is a list which stores the payload as integers
payload = []
# because of the way that byte code works, Python automatically
# converts individual bytes to integers.
# we would store them, but instead we want to make this function
# modular, so we skip by element byte size
if len(payload_raw) % EBS != 0:
# delete this whole check if we don't have problems
print("array size failed to sync")
for idx in range(len(payload_raw) // EBS):
start = idx * EBS
end = start + EBS
this_piece = payload_raw[start:end]
val = unpack(self.payload_type, this_piece)[0]
payload += [val]
self.payload = payload
return
def encodePayload(self):
"""output the payload, encoded in NBT format"""
output = b''
for val in self.payload:
output += pack(self.payload_type, val)
return output
def __init__(self, data_ob, named=True):
"""initialize the byte array"""
# this crazy thing again, see notes in NbtTagBase
self.data_ob = data_ob
if named:
self.getName()
self.named = True
else:
self.name = ''
self.named = False
# we must read in the length of the array first
self.get_payload_length()
# then read in the actual data
self.getPayload()
def __str__(self):
"""Return a nice string representing the tag byte array contents.
If the array is longer than sixteen characters (and it often is)
truncate the readout and indicate the total number of entries."""
# map the payload to the local namespace
payload = self.payload
# initialize the output with the name of the tag
output = self.name + ": "
# if the array is too long, list how many entries there are total.
payload_size = len(payload)
if payload_size > 16:
printed_size = 16
appendix = '... total ' + str(payload_size) + ' entries'
else:
printed_size = payload_size
appendix = ''
# add the entries to the output string
for c in range(printed_size):
output += str(payload[c]) + ' '
# add the appendix onto the end... where it belongs!
output += appendix
return output
def encode(self, tagged=True):
# initialize the byte string with the identifier byte
if tagged:
output = pack('>B', self.tag_type)
# unless the tag isn't tagged (lists)
else:
output = b''
if self.named:
output += self.encodeName()
# We must encode the payload first
# because the payload length may change during encoding
# for example, when encoding the block position array
temp_payload_output = self.encodePayload()
output += self.encode_payload_length()
output += temp_payload_output
return output
class NbtTag8(NbtTag7):
"""TAG_String"""
tag_type = 8
def get_payload_length(self):
"""get the length of the payload data"""
# the string length is two bytes long
raw_payload_length = self.data_ob.get_data(2)
# interpret as an unsigned short (why would length be negative?)
self.payload_length = unpack('>H', raw_payload_length)[0]
return
def encode_payload_length(self):
"""encode the payload length"""
# find the length
length = len(self.raw_payload)
# pack it up
raw_length = pack('>H', length)
return raw_length
def getPayload(self):
"""read in the payload of the string tag."""
# read in the string data
payload_raw = self.data_ob.get_data(self.payload_length)
# convert to string and store
self.payload = str(payload_raw, 'utf_8')
return
def encodePayload(self):
"""output the payload, encoded in NBT format"""
# calculate the raw payload length, since utf_8 characters
# may not correspond 1 to 1 with byte characters
raw_payload = bytes(self.payload, 'utf_8')
# cache the raw bytes for use in encodePayload
self.raw_payload = raw_payload
return raw_payload
def __str__(self):
"""Return a nice string representing the tag string contents."""
# easy enough
payload = self.payload
# combine and we're done
output = self.name + ": " + payload
return output
class NbtTag9(NbtTagBase):
"""TAG_List, this tag can contain other tags!"""
tag_type = 9
# Aiugh! Nightmare! Actually, not so bad once it's working properly.
def getPayload(self):
"""read in all sub-tags into a list"""
# map the get_data method for easy access
get_data = self.data_ob.get_data
# get the type of tag stored in the list
contents_type = get_data(1)[0]
self.contents_type = contents_type
# map the appropriate tag constructor
sub_tag = tag_list[contents_type]
# get the number of elements in the list
contents_length = unpack('>I', get_data(4))[0]
payload = []
for c in range(contents_length):
# import the tags
new_tag = sub_tag(self.data_ob, named=False)
payload += [new_tag]
self.payload = payload
def encodePayload(self):
# first, encode the contents type
output = pack('>B', self.contents_type)
# then encode the number of elements
output += pack('>I', len(self.payload))
# now encode each sub-tag
for c in self.payload:
output += c.encode(tagged=False)
return output
def __str__(self):
"""make a string representation of the list"""
# the starting line of the list
output = self.name + ": List\n"
# indent the string representation of sub-elements
for tag in self.payload:
this_str = tag.__str__()
# split and indent each line, since results may have multiple lines
str_list = this_str.splitlines()
for idx in range(len(str_list)):
str_list[idx] = ' ' + str_list[idx]
str_result = '\n'.join(str_list) + '\n'
output += str_result
return output
class NbtTag10(NbtTagBase):
"""TAG_Compound, this tag can contain other tags also! The Horror!"""
tag_type = 10
# this one turned to be easier than the TAG_List
def getPayload(self):
"""read in all sub-tags into a dict"""
# map the get_data method for easy access
get_data = self.data_ob.get_data
# store tags keyed by name
payload = {}
# import the tags
while True:
# get the key value
key = get_data(1)[0]
# generate a new tag
new_tag = tag_list[key](self.data_ob)
# if the tag is TAG_End, we're done
if isinstance(new_tag, NbtTag0): break
# otherwise, store the new tag in the dictionary
payload.update({new_tag.name: new_tag})
# store the payload
self.payload = payload
def encodePayload(self):
# initialize the output string
output = b''
# string together all the sub tags
payload = self.payload
for key in payload:
tag = payload[key]
output += tag.encode()
# add the stop-byte at the end
output += pack('>B', 0)
return output
def __str__(self):
"""make a string representation of the compound"""
# the starting line of the list
output = self.name + ": Compound\n"
# get the string representations of the sub-tags
for key in self.payload:
# the key is the name of the tag, the tag is the object
tag = self.payload[key]
# get the string representation of the tag
this_str = str(tag)
# since results may have multiple lines,
# split and indent each line.
str_list = this_str.splitlines()
for idx in range(len(str_list)):
str_list[idx] = ' ' + str_list[idx]
# join the resulting strings back together
str_result = '\n'.join(str_list) + '\n'
# add it to the output
output += str_result
# When all the strings are added together, return it
return output
class NbtTag11(NbtTag7):
"""TAG_Int_Array"""
tag_type = 11
Element_Byte_Size = 4
payload_type = '>i'
class NbtTag12(NbtTag7):
"""TAG_Long_Array"""
tag_type = 12
Element_Byte_Size = 8
payload_type = '>Q'
def encodePayload(self):
# check if this is a dirty block data array
if hasattr(self, "unpacked"):
# re-pack the integers
bit_length = max(len(bin(len(self.palette)-1))-2,4)
stride = (64 - (64 % bit_length))
ids_per_int = 64 // bit_length
# Collect binary representations of indices
binary_chunks = [format(index, f'0{bit_length}b') for index in self.unpacked]
# Reverse the order within each 64-bit chunk and concatenate
binary_string = ''.join(
''.join(reversed(binary_chunks[i:i + ids_per_int]))
for i in range(0, len(binary_chunks), ids_per_int)
)
# Convert to 64-bit integers
self.payload = [int(binary_string[i:i + stride], 2) for i in range(0, len(binary_string), stride)]
return NbtTag7.encodePayload(self)
# switch list for selecting the correct tag
# keyed by integer
tag_list = [
NbtTag0,
NbtTag1,
NbtTag2,
NbtTag3,
NbtTag4,
NbtTag5,
NbtTag6,
NbtTag7,
NbtTag8,
NbtTag9,
NbtTag10,
NbtTag11,
NbtTag12
]
class NbtData(object):
"""NbtData is designed to parse and store NBT format files."""
# tags are individual objects, and may store other tag objects
def get_data(self, length):
"""Extract and return the specified number of bytes from
the raw data source.
This behaves much like file.read() but, different?"""
prev_loc = self.loc
self.loc += length
data = self.raw[prev_loc:self.loc]
return data
def __init__(self, source_data, current_location=0):
"""Read in and parse all the data.
source_data : a byte string containing the raw NBT format data.
current_location : an integer offset, in case you want to start
in the middle of a file."""
# the raw source
self.raw = source_data
# the current location in the file
self.loc = current_location
# the method for incrementally reading in data
get_data = self.get_data
# how much data do we have?
raw_length = len(source_data)
# a list to store the tags in.
all_tags = []
# keep reading in tags until you reach the end of the data
while self.loc < raw_length:
# what kind of tag is it?
key = get_data(1)[0]
# here is the new tag, all parsed and ready to go!
new_tag = tag_list[key](self)
# store the tag in the list
all_tags += [new_tag]
# store the list of tags internally
self.tags = all_tags
def __str__(self):
"""make a clean string representation of the NBT file contents"""
# an empty string to start with
output = ''
# string all the output strings together
for tag in self.tags:
output += str(tag) + '\n'
# and spit it out, easy as pie!
return output
def encode_data(self):
output = b''
for tag in self.tags:
output += tag.encode()
return output
class NbtNew(NbtData):
"""a dummy data ob for making new tags, so we can clobber them"""
def get_data(self, length):
nothing = pack('>B', 0)
result = nothing * length
return result
def __init__(self):
pass
class Region(object):
"""Parse a region file into usable containers."""
# Python 3.2 supports gzip.decompress
# compression_types = {1:gzip.decompress, 2:zlib.decompress}
# Python 3.1 does not... so I have excluded it
compression_types = {2: zlib.decompress}
def get_chunk(self, num):
"""Return the parsed NBT file containing the chunk.
Cache already extracted chunks for quick access."""
# check if the chunk is cached.
if num in self.cached_chunks:
cached_chunk = self.cached_chunks[num]
return cached_chunk
# if it's not cached...
# check to see if it is populated
try:
raw_offset = self.active_chunks_offsets[num]
# if the chunk is not populated, return None
except:
return None
# the offset is stored as the distance from the beginning of the file
# subtract 2 to get from the beginning of raw_block
offset = (raw_offset - 2) * (2 ** 12)
# decode the length of the data
length = unpack('>I', self.raw_block[offset:offset + 4])[0]
# decode the compression type
compression_type = unpack('>b', self.raw_block[offset + 4:offset + 5])[0]
# get the compressed chunk data. Note it is one shorter than normal
compressed_chunk = self.raw_block[offset + 5:offset + 4 + length]
# find the appropriate decompress method
decompressor = self.compression_types[compression_type]
# decompress the data
expanded_data = decompressor(compressed_chunk)
# parse the data into an NbtData container
this_nbt = NbtData(expanded_data)
# cache and return the container
self.cached_chunks.update({num: this_nbt})
return this_nbt
def encode_chunk(self, num):
"""save the specified chunk to the internal data,
must be cached already."""
# localize active_chunks_offsets and active_chunks_lengths
offsets = self.active_chunks_offsets
lengths = self.active_chunks_lengths
# retrieve the chunk to save
this_chunk = self.cached_chunks[num]
# encode the chunk data in NBT byte format
encoded_chunk = this_chunk.encode_data()
# compress the data
compressed_chunk = zlib.compress(encoded_chunk)
# calculate the length of the compressed data,
# plus the length bytes (4)
# plus the encoding (1)
data_length = len(compressed_chunk) + 5
# calculate the new length in 4 kiB chunks
# round up (floor division, then add 1)
new_length = (data_length // (2 ** 12)) + 1
# calculate how much to pad the data, to make it fit properly
pad_length = (new_length * (2 ** 12)) - data_length
padding_bytes = b'\x00' * pad_length
# encode the length, subtract the four bytes for the length
# that we added earlier
length_bytes = pack('>I', (data_length - 4))
# encode the compression byte, should be 2 to indicate zlib
compression_id_byte = pack('>B', 2)
# compile the whole block of data for insertion
full_data_block = (length_bytes +
compression_id_byte +
compressed_chunk +
padding_bytes)
# retrieve the offset distance
offset = offsets[num]
# retrieve the old length
old_length = lengths[num]
# find the difference between the old and new lengths
length_difference = new_length - old_length
# update the other file offsets to reflect the new block length
if length_difference != 0:
# update this internal length value
lengths[num] = new_length
# print('altering offsets for ',num ,' at ',offset,' delta length: ', length_difference)
# print('new length', new_length)
# go through the active chunks
for key in offsets:
# get this chunk's offset
chunk_offset = offsets[key]
# if it is later in the file, alter it
if chunk_offset > offset:
# find the new offset
new_offset = chunk_offset + length_difference
# assign it
offsets[key] = new_offset
# print('chunk:', key, ' was offset ', chunk_offset, ' but is now at ', new_offset)
# update the raw data block
# map the block to a local
raw = self.raw_block
# the internal offset will be two less (missing headers)
# and converted to 4 kiB
start = (offset - 2) * (2 ** 12)
end = (offset + old_length - 2) * (2 ** 12)
# slice off the front
front = raw[:start]
# slice off the back
back = raw[end:]
# paste the new data together
new_raw = front + full_data_block + back
# map it back into the internal raw_block data
self.raw_block = new_raw
# map back the offsets and lengths too
self.active_chunks_offsets = offsets
self.active_chunks_lengths = lengths
# Aaaaaand we're done.
print('.', end='', flush=True)
return
def encode_locations(self):
"""Encode the chunk locations and offsets into self.raw_locations"""
# localize active_chunks_offsets and active_chunks_lengths
offsets = self.active_chunks_offsets
lengths = self.active_chunks_lengths
# generate a new raw_locations string
new_locs = b''
# initialize an empty data block, for insertion for non-existent chunks
blank_chunk = pack('>I', 0)
# generate new data for each chunk
for idx in range(2 ** 10):
# compile the new 4 byte chunk
if idx in offsets:
# only three bytes for the offset
offset = pack('>I', offsets[idx])[1:]
# and one byte for the length
length = pack('>B', lengths[idx])
this_chunk = offset + length
new_locs += this_chunk
else:
new_locs += blank_chunk
# store the data
self.raw_locations = new_locs
def encode_timestamps(self):
"""Encode the chunk changed timestamp with the current time."""
# map the timestamp string
timestamps = self.raw_timestamps
# get the current timestamp
timestamp = int(time.time())
# pack it as a 4 byte int
raw_timestamp = pack('>I', timestamp)
# update all the cached chunks
for idx in self.cached_chunks:
start = idx * 4
end = start + 4
front = timestamps[:start]
back = timestamps[end:]
timestamps = front + raw_timestamp + back
# re-store the timestamp list
self.raw_timestamps = timestamps
def __init__(self, file_path):
"""parse the region file"""
# save the file path internally
self.file_path = file_path
# read in the file data, and close the file
# don't catch an exception if it occurs!
region_file = open(file_path, 'rb')
region_locations = region_file.read(2 ** 12)
region_timestamps = region_file.read(2 ** 12)
region_chunks_raw = region_file.read()
region_file.close()
# active_chunks_offsets is a dict with:
# index number : offset (in 4kiB chunks)
active_chunks_offsets = {}
# active_chunks_lengths is a dict with:
# index number : length (in 4kiB chunks)
active_chunks_lengths = {}
# read in the offsets
for num in range(2 ** 10):
pos = num * 4
data = region_locations[pos:pos + 4]
# offset from the beginning of the file is 2 greater than
# the offset from the beginning of the region_chunks_raw block
offset = unpack('>I', b'\x00' + data[:3])[0]
if offset > 0:
active_chunks_offsets.update({num: offset})
length = unpack('>B', data[3:])[0]
active_chunks_lengths.update({num: length})
# now we have a dict of all the active chunks.
# save the raw data later, for writing out
self.raw_locations = region_locations
self.raw_timestamps = region_timestamps
self.raw_block = region_chunks_raw
# save the active chunks, these are important!
self.active_chunks_offsets = active_chunks_offsets
self.active_chunks_lengths = active_chunks_lengths
# initialize a cached_chunks dict,
# for when chunks are extracted from the raw_block
self.cached_chunks = {}
def write(self):
"""Save all cached chunks to the region file."""
debug = self.file_path + " saving (dots are chunks)"
print(debug)
# map the cached chunk dictionary to the local namespace
chunks = self.cached_chunks
# print(len(chunks),chunks)
# write out each of the chunks to internal data
for key in chunks:
self.encode_chunk(key)
# write the offset data to internal data
self.encode_locations()
# write the current timestamp on all chunks changed to internal data
self.encode_timestamps()
# write the internal data to a file
region_file = open(self.file_path, 'wb')
region_file.write(self.raw_locations)
region_file.write(self.raw_timestamps)
region_file.write(self.raw_block)
region_file.close()
print(" completed")
return True
# noinspection PyTypeChecker
class SaveFile(object):
"""Interface object for a minecraft save file.
Methods:
block(x,y,z): returns relevant block data. Accepts options.
surface(x,z): returns the surface block data. Accepts options.
Instance Variables:
save_file: string with file name
"""
# the top height of the map
# note that the y height goes down to -64
map_height: int = 320
map_bottom: int = -64
def __init__(self, foldername):
"""Initialize and read in basic file data."""
# The file name is the save file that this object is pointed to.
self.save_folder = foldername
# the region objects are stored in a dict by filename.
self.regions = {}
# import the dat file
self.dat = None
self.read_dat()
self.lock = None
self.write_lock()
def block_to_idx(self, blk_x, blk_y, blk_z):
"""Convert absolute block cords to an intra-section index."""
if blk_y > self.map_height or blk_y < self.map_bottom:
return None
idx = (blk_y % 16) * 16 * 16 + (blk_z % 16) * 16 + (blk_x % 16)
return idx
def get_region(self, reg_x, reg_z):
"""Return a Region object.
If the region is loaded, get the object from the cache.
Otherwise, load the data and cache it."""
# map self.regions
regions = self.regions
# derive the appropriate file name
file_name = 'r.' + str(reg_x) + '.' + str(reg_z) + '.mca'
if file_name in regions:
return regions[file_name]
else:
# compose the path to the file
file_path = self.save_folder + '/region/' + file_name
try:
new_region = Region(file_path)
except IOError:
new_region = None
self.regions.update({file_name: new_region})
return new_region
@staticmethod
def block_to_chunk(blk_x, blk_z):
"""Convert block to chunk coordinates, return tuple."""
chunk_x = int(blk_x // 16)
chunk_z = int(blk_z // 16)
return chunk_x, chunk_z
@staticmethod
def chunk_to_region(chk_x, chk_z):
"""Convert chunk to region coordinates, return tuple."""
reg_x = chk_x // 32
reg_z = chk_z // 32
return reg_x, reg_z
@staticmethod
def chunk_to_num(chk_x, chk_z):
"""Convert chunk coordinates to an intra-region index."""
num = (chk_x % 32) + (chk_z % 32) * 32
return num
def get_chunk(self, chk_x, chk_z):
"""Return the chunk at the chunk coordinates (x,z).
If the chunk is not present int the save file, return None."""
# find the region the chunk is stored in, and retrieve it.
reg_x, reg_z = self.chunk_to_region(chk_x, chk_z)
reg = self.get_region(reg_x, reg_z)
# check to see if the region actually exists.
# if it doesn't return None
if reg is None:
return None
# calculate the chunk numeric index
chunk_idx = self.chunk_to_num(chk_x, chk_z)
# retrieve the chunk from the region
# if it isn't in the region, this "chunk" will be None
this_chunk = reg.get_chunk(chunk_idx)
return this_chunk
def get_chunk_from_cord(self, blk_x, blk_z):
"""Return the chunk containing the block coordinates (x,z)."""
chu_x, chu_z = self.block_to_chunk(blk_x, blk_z)
this_chunk = self.get_chunk(chu_x, chu_z)
return this_chunk
def get_block_data(self, blk_x, blk_y, blk_z):
this_chunk = self.get_chunk_from_cord(blk_x, blk_z)
# if the chunk doesn't exist, return None
if this_chunk is None:
return None, None, None
# map the dict storing the relevant tag data
sections = this_chunk.tags[0].payload['sections'].payload
# +64 or +63? what is the proper offset?
idx = (blk_y+64) // 16
if idx < len(sections):
section = sections[idx].payload
else:
# no section means the block is air
return 0, sections, None
# Grab the pallete, we need it later, and also now
if 'block_states' in section:
block_states = section['block_states'].payload
palette = block_states['palette'].payload
if 'data' in block_states:
data = block_states['data']
else: data = None
else:
block_states = None
data = None
palette = None
return palette, data, block_states
def block(self, blk_x, blk_y, blk_z, fetch_properties=False):
"""Return relevant block data in a dict.
The keys to the dict are the option key characters.
If no options are selected, return the block id as a raw int
because why do we need all this noise about block dictionaries?
x, y, z : coordinates of target block.
options : string containing option key characters (below) in any order.
It's all in flux
"""
palette, data, block_states = self.get_block_data(blk_x, blk_y, blk_z)
if palette is None:
# No chunk!
return None
if data is None:
# No chunk!
return {'B': 'minecraft:air'}
if palette == 0:
# Unpopulated section
return {'B': 'minecraft:air'}
if len(palette) == 1:
# There's just one block type in this section, so we know what it has to be
palette_data = palette[0].payload
block_data = {'B': palette_data['Name'].payload}
if not fetch_properties:
return block_data
if 'Properties' in palette_data:
props = palette_data['Properties'].payload
for propert in props:
block_data[propert] = props[propert].payload
return block_data
# just do something like this?
blkidx = self.block_to_idx(blk_x, blk_y, blk_z)
if blkidx is None: return None
if not hasattr(data, "unpacked"):
self.unpack_block_states(data, palette)
foundpltid = data.unpacked[blkidx]
# don't need any of this?
# keep for readability in the future
'''idx_offset = self.block_to_idx(blk_x, blk_y, blk_z)
# 64 bit
bit_stride = max(4, len(f'{len(palette) - 1:b}'))
ids_per_int = 64 // bit_stride
int_idx = idx_offset // ids_per_int
bin_int = f'{data[int_idx]:064b}'
int_offset = idx_offset % ids_per_int
start = str((int_offset + 1) * -bit_stride)
if int_offset == 0: end = ''
else: end = str(int_offset * -bit_stride)
block_ID = int(eval(f'bin_int[{start}:{end}]'), base=2)'''
palette_data = palette[foundpltid].payload
block_data = {'B': palette_data['Name'].payload}
if not fetch_properties:
return block_data
if 'Properties' in palette_data:
props = palette_data['Properties'].payload
for propert in props:
block_data[propert] = props[propert].payload
return block_data
def get_map_data(self):
"""returns a dictionary which is the mutable map data. Change and save to update."""
map_payload = self.dat.tags[0].payload['Data'].payload
# print(map_payload)
return map_payload
def get_player(self):
"""returns a dictionary which is the mutable player data. Change and save to update."""
player_payload = self.get_map_data()['Player'].payload
# print(player_payload)
return player_payload
def get_player_pos(self):
pos_data = self.get_player()['Pos'].payload
pos = [pos_data[i].payload for i in range(3)]
return pos
def get_player_block(self):
pos_float = self.get_player_pos()
for k,i in enumerate(pos_float):
if i < 0:
pos_float[k] -= 1
pos_int = [int(i) for i in pos_float]
return pos_int