diff --git a/scripts/S3_FSCK/s3_fsck_p0.py b/scripts/S3_FSCK/s3_fsck_p0.py index 6b67e68..159f428 100644 --- a/scripts/S3_FSCK/s3_fsck_p0.py +++ b/scripts/S3_FSCK/s3_fsck_p0.py @@ -31,6 +31,9 @@ COS = cfg["cos_protection"] PARTITIONS = int(cfg["spark.executor.instances"]) * int(cfg["spark.executor.cores"]) +# The arcindex maps the ARC Schema to the hex value found in the ringkey, in the 24 bits preceding the last 8 bits of the key +# e.g. FD770A344D6A6D259F92C500000000512040C070 +# FD770A344D6A6D259F92C50000000051XXXXXX70 where XXXXXX : 2040C0 arcindex = {"4+2": "102060", "8+4": "2040C0", "9+3": "2430C0", "7+5": "1C50C0", "5+7": "1470C0"} os.environ["PYSPARK_SUBMIT_ARGS"] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell' @@ -118,6 +121,7 @@ def sparse(f): def check_split(key): + """With srebuildd, check if the RING key is split or not. Return True if split, False if not split, None if error (422, 404, 50X, etc.)""" url = "http://%s:81/%s/%s" % (SREBUILDD_IP, SREBUILDD_ARC_PATH, str(key.zfill(40))) r = requests.head(url) if r.status_code == 200: @@ -126,9 +130,16 @@ def check_split(key): def blob(row): + """Return a list of dict with the sproxyd input key, its subkey if it exists and digkey""" + # set key from row._c2 (column 3) which contains an sproxyd input key + # input structure: (bucket name, s3 object key, sproxyd input key) + # FIXME: the naming of the method is terrible key = row._c2 + # use the sproxyd input key to find out if the key is split or not + # check_split(key) is used to transform the input key into a RING key, assess if it exists AND whether it is a SPLIT. split = check_split(key) if not split['result']: + # If the key is not found, return a dict with the key, subkey and digkey set to NOK_HTTP return [{"key":key, "subkey":"NOK_HTTP", "digkey":"NOK_HTTP"}] if split['is_split']: try: @@ -146,25 +157,61 @@ def blob(row): chunks = chunk + chunk chunkshex = chunks.encode("hex") rtlst = [] + # the k value is the subkey, a subkey is the sproxyd input key for each stripe of the split for k in list(set(sparse(chunkshex))): + # "key": key == primary sproxyd input key of a split object + # "subkey": k == subkey sproxyd input key of an individual stripe of a split object + # "digkey": gen_md5_from_id(k)[:26] == md5 of the subkey + # digkey: the unique part of a main chunk before service id, + # arc schema, and class are appended rtlst.append( {"key": key, "subkey": k, "digkey": gen_md5_from_id(k)[:26]} ) + # If the key is split and request is OK: + # return a list of dicts with the key (primary sproxyd input key), + # subkey (sproxyd input key of a split stripe) and + # digkey, (md5 of the subkey) + # digkey: the unqiue part of a main chunk before service id, + # arc schema, and class are appended return rtlst + # If the key is split and request is not OK: + # return a dict with the key (primary sproxyd input key) + # with both subkey and digkey columns set to NOK return [{"key": key, "subkey": "NOK", "digkey": "NOK"}] except requests.exceptions.ConnectionError as e: + # If there is a Connection Error in the HTTP request: + # return a dict with the key(primary sproxyd input key), + # with both subkey and digkey set to NOK return [{"key": key, "subkey": "NOK_HTTP", "digkey": "NOK_HTTP"}] if not split['is_split']: + # If the key is not split: + # return a dict with the key (primary sproxyd input key), + # subkey set to SINGLE and + # digkey, (md5 of the subkey) + # digkey: the unique part of a main chunk before service id, + # arc schema, and class are appended return [{"key": key, "subkey": "SINGLE", "digkey": gen_md5_from_id(key)[:26]}] new_path = os.path.join(PATH, RING, "s3-bucketd") files = "%s://%s" % (PROTOCOL, new_path) +# reading without a header, +# columns _c0, _c1, _c2 are the default column names of +# columns 1, 2, 3 for the csv +# input structure: (bucket name, s3 object key, sproxyd input key) +# e.g. test,48K_object.01,9BC9C6080ED24A42C2F1A9C78F6BCD5967F70220 +# Required Fields: +# - _c2 (sproxyd input key) df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) +# repartition the dataframe to have the same number of partitions as the number of executors * cores df = df.repartition(PARTITIONS) +# Return a new Resilient Distributed Dataset (RDD) by applying a function to each element of this RDD. rdd = df.rdd.map(lambda x : blob(x)) +# Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. Then transform it into a dataframe. dfnew = rdd.flatMap(lambda x: x).toDF() single = "%s://%s/%s/s3fsck/s3-dig-keys.csv" % (PROTOCOL, PATH, RING) +# write the dataframe to a csv file with a header +# output structure: (digkey, sproxyd input key, subkey if available) dfnew.write.format("csv").mode("overwrite").options(header="true").save(single) diff --git a/scripts/S3_FSCK/s3_fsck_p1.py b/scripts/S3_FSCK/s3_fsck_p1.py index 8d7b5ac..244d1a3 100644 --- a/scripts/S3_FSCK/s3_fsck_p1.py +++ b/scripts/S3_FSCK/s3_fsck_p1.py @@ -39,37 +39,77 @@ files = "%s://%s/%s/listkeys.csv" % (PROTOCOL, PATH, RING) +# FIXME skip native column names, rename on the fly. +# reading without a header, +# columns _c0, _c1, _c2, _c3 are the default column names of +# columns 1, 2, 3, 4 for the csv +# REQUIRED N, Y, N, Y +# input structure: (RING key, main chunk, disk, flag) +# e.g. 555555A4948FAA554034E155555555A61470C07A,8000004F3F3A54FFEADF8C00000000511470C070,g1disk1,0 +# Required Fields: +# - _c1 (main chunk) +# - _c3 (flag) df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) -#list the ARC SPLIT main chunks +# list the ARC_REPLICATED main chunks from column 2 with service ID 50 and flag = 0 (new), into a single column (vector) named "_c1" +# FIXME rename column on the fly df_split = df.filter(df["_c1"].rlike(r".*000000..50........$") & df["_c3"].rlike("0")).select("_c1") +# In df_split, match keys which end in 70 from column with litteral name "_c1", to a new dataframe dfARCsingle +# FIXME explain what we're trying to accomplish by identifying such keys (0x50: ARC_REPLICATED, per UKS --> calls for a COS ending, not an ARC ending). Under what circumstances should such keys exist? dfARCsingle = df_split.filter(df["_c1"].rlike(r".*70$")) +# Filter out when strictly less than 4 stripe chunks (RING orphans), creating a new "count" column on the fly +# dfARCsingle now has two columns ("_c1", "count") dfARCsingle = dfARCsingle.groupBy("_c1").count().filter("count > 3") + +# in dfARCsingle, duplicate column named "_c1" into a new "ringkey" (aka. "main chunk") column +# dfARCsingle now has three columns ("_c1", "count", "ringkey") +# FIXME do the renaming some place else, e.g. upon dataframe creation, be consistent about it dfARCsingle = dfARCsingle.withColumn("ringkey",dfARCsingle["_c1"]) +# in df_split (a vector of main chunks), filter column named "_c1" RING key main chunk for the configured COS protection dfCOSsingle = df_split.filter(df["_c1"].rlike(r".*" + str(COS) + "0$")) + +# count the number of chunks in column named "_c1" found for each key, creating the "count" column on the fly +# dfCOSsingle now has two columns ("_c1", "count") dfCOSsingle = dfCOSsingle.groupBy("_c1").count() +# in dfCOSsingle, duplicate column named "_c1" into a new "ringkey" (aka. "main chunk") column +# dfCOSsingle now has three columns ("_c1", "count", "ringkey") dfCOSsingle = dfCOSsingle.withColumn("ringkey",dfCOSsingle["_c1"]) +# in dfCOSsingle, do an in-place substring operation on column "_c1": get the 26 first characters of the main chunk (MD5 hash of the input key + 4 extra chars) +# FIXME: say why we need those 4 extra characters (about 18% more weight than the 22-char md5 alone) dfCOSsingle = dfCOSsingle.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)")) +# union the dfCOSsingle and dfARCsingle dataframes ("_c1", "count", "ringkey") dfARCsingle = dfARCsingle.union(dfCOSsingle) -#list the ARC KEYS +# list the ARC_SINGLE keys with service ID 51 +# repeat the same logic as before, with a different initial mask +# Output is a three-column matrix that will be unioned with the previous dataframe dfARCsingle df_sync = df.filter(df["_c1"].rlike(r".*000000..51........$")).select("_c1") +# Match keys which end in 70 from column 2 dfARCSYNC = df_sync.filter(df["_c1"].rlike(r".*70$")) +# Filter out when less than 3 stripe chunks (RING orphans) dfARCSYNC = dfARCSYNC.groupBy("_c1").count().filter("count > 3") +# dfARCSYNC "_c1" column is duplicated into a "ringkey" column dfARCSYNC = dfARCSYNC.withColumn("ringkey",dfARCSYNC["_c1"]) +# in dfARCSYNC, do an in-place substring operation on column "_c1": get the 26 first characters of the main chunk (MD5 hash of the input key + 4 extra chars) dfARCSYNC = dfARCSYNC.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)")) +# filter "_c1" for configured COS protection dfCOCSYNC = df_sync.filter(df["_c1"].rlike(r".*" + str(COS) + "0$")) +# count the number of chunks in "_c1" found for each key dfCOCSYNC = dfCOCSYNC.groupBy("_c1").count() +# dfCOCSYNC "_c1" column is duplicated into a "ringkey" column dfCOCSYNC = dfCOCSYNC.withColumn("ringkey",dfCOCSYNC["_c1"]) +# in dfCOCSYNC, do an in-place substring operation on column "_c1": get the 26 first characters of the main chunk (MD5 hash of the input key + 4 extra chars) dfCOCSYNC = dfCOCSYNC.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)")) +# union the two previous dataframes dfARCSYNC = dfARCSYNC.union(dfCOCSYNC) +# union again the two outstanding dataframes dfARCSYNC and dfARCSINGLE into a dftotal dataframe dftotal = dfARCSYNC.union(dfARCsingle) total = "%s://%s/%s/s3fsck/arc-keys.csv" % (PROTOCOL, PATH, RING) dftotal.write.format("csv").mode("overwrite").options(header="true").save(total) diff --git a/scripts/S3_FSCK/s3_fsck_p2.py b/scripts/S3_FSCK/s3_fsck_p2.py index 22eeb39..2be5baf 100644 --- a/scripts/S3_FSCK/s3_fsck_p2.py +++ b/scripts/S3_FSCK/s3_fsck_p2.py @@ -39,16 +39,48 @@ .getOrCreate() +# s3keys are generated by verifySproxydKeys.js script and processed by s3_fsck_p0.py s3keys = "%s://%s/%s/s3fsck/s3-dig-keys.csv" % (PROTOCOL, PATH, RING) +# ringkeys are generated by the listkeys.py (or ringsh dump) script and processed by s3_fsck_p1.py ringkeys = "%s://%s/%s/s3fsck/arc-keys.csv" % (PROTOCOL, PATH, RING) +# reading with a header, the columns are named. +# columns digkey, sproxyd input key, subkey are the actual column names of +# columns 1, 2, 3 for the csv +# input structure: (digkey, sproxyd input key, subkey) +# e.g. 7359114991482315D0A5890000,BDE4B9BBEB45711EC2F1A9C78F6BCD59E02C6220,SINGLE +# Required Fields: +# - digkey +# - sproxyd input key dfs3keys = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(s3keys) + + +# reading with a header, the columns are named. +# columns _c1, count, ringkey (main chunk) are the actual column names of +# columns 1, 2, 3 for the csv +# input structure: (digkey, count, ringkey (main chunk)) +# e.g. 907024530554A8DB3167280000,12,907024530554A8DB31672800000000512430C070 +# Required Fields: +# - digkey +# - ringkey (main chunk) dfringkeys = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(ringkeys) +# rename the column _c1 to digkey, the next write will output a header that uses digkey instead of _c1 +# digkey: the unique part of a main chunk before service id, arc schema, and class are appended dfringkeys = dfringkeys.withColumnRenamed("_c1","digkey") +# inner join the s3keys (sproxyd input key) and ringkeys (the main chunk of the strip or replica) +# on the digkey column. The result will be a dataframe with the columns ringkey, digkey +# the inner join leftani will not return rows that are present in both dataframes, +# eliminating ringkeys (main chunks) that have metadata in s3 (not application orphans). +# digkey: the unique part of a main chunk before service id, arc schema, and class are appended +# ringkey: the main chunk of the strip or replica inner_join_false = dfringkeys.join(dfs3keys,["digkey"], "leftanti").withColumn("is_present", F.lit(int(0))).select("ringkey", "is_present", "digkey") + +# Create the final dataframe with only the ringkey (the main chunk of the strip or replica) df_final = inner_join_false.select("ringkey") + +# write the final dataframe to a csv file allmissing = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING) df_final.write.format("csv").mode("overwrite").options(header="false").save(allmissing) diff --git a/scripts/S3_FSCK/s3_fsck_p3.py b/scripts/S3_FSCK/s3_fsck_p3.py index 6243e85..77712f9 100644 --- a/scripts/S3_FSCK/s3_fsck_p3.py +++ b/scripts/S3_FSCK/s3_fsck_p3.py @@ -45,34 +45,62 @@ .config("spark.local.dir", PATH) \ .getOrCreate() - +# Use of the arcindex limits the inspection to a specific ARC protection scheme. +# If there were more than one cluster with different ARC protection schemes then this would limit the check to a specific scheme. +# FOOD FOR THOUGHT: limits finding keys which may have been written after a schema change or any bug did not honor the schema. +# The arcindex is a dictionary that contains the ARC protection scheme and the hex value found in the ringkey arcindex = {"4+2": "102060", "8+4": "2040C0", "9+3": "2430C0", "7+5": "1C50C0", "5+7": "1470C0"} + +# The arcdatakeypattern is a regular expression that matches the ARC data keys arcdatakeypattern = re.compile(r'[0-9a-fA-F]{38}70') def statkey(row): + """ statkey takes a row from the dataframe and returns a tuple with the key, status_code, size""" key = row._c0 try: url = "%s/%s" % (SREBUILDD_URL, str(key.zfill(40))) r = requests.head(url) if r.status_code == 200: - if re.search(arcdatakeypattern, key): + if re.search(arcdatakeypattern, key): # Should consider changing this to match any entry in the arcindex + # The size of the ARC data key is 12 times the size of the ARC index key. + # At this point there is no longer access to the qty of keys found, so + # it simply computes based on the presumed schema of 12 chunks per key. size = int(r.headers.get("X-Scal-Size", False))*12 else: + # The size of the ARC index key is the size of the ARC index key plus the size of the ARC data key times the COS protection. + # At this point there is no longer access to the qty of keys found, so + # it simply computes based on the presumed schema of int(COS) chunks per key. + # If there are orphans which are not matching the arcdatakeypattern they will + # be computed as if they were COS. size = int(r.headers.get("X-Scal-Size",False)) + int(r.headers.get("X-Scal-Size",False))*int(COS) return ( key, r.status_code, size) else: + # If the key is not found (HTTP code != 200) then return the key, the status code, and 0 for the size return ( key, r.status_code, 0) except requests.exceptions.ConnectionError as e: + # If there is a connection error then return the key, the status code, and 0 for the size return ( key, "HTTP_ERROR", 0) files = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING) + +# reading without a header, +# columns _c0 is the default column names of +# column 1 for the csv +# input structure: _c0 (main chunk) +# e.g. 998C4DF2FC7389A7C82A9600000000512040C070 +# Required Fields: +# - _c0 (main chunk) df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").load(files) + +# Create a resilient distributed dataset (RDD) from the dataframe (logical partitions of data) +# The rdd is a collection of tuples returned from statkey (key, status_code, size) rdd = df.rdd.map(statkey) #rdd1 = rdd.toDF() +# The size_computed is the sum of the size column in the rdd size_computed= rdd.map(lambda x: (2,int(x[2]))).reduceByKey(lambda x,y: x + y).collect()[0][1] string = "The total computed size of the not indexed keys is: %d bytes" % size_computed banner = '\n' + '-' * len(string) + '\n' diff --git a/scripts/S3_FSCK/s3_fsck_p4.py b/scripts/S3_FSCK/s3_fsck_p4.py index df61463..f93dd75 100644 --- a/scripts/S3_FSCK/s3_fsck_p4.py +++ b/scripts/S3_FSCK/s3_fsck_p4.py @@ -68,9 +68,24 @@ def deletekey(row): files = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING) + +# reading without a header, +# columns _c0 is the default column names of +# column 1 for the csv +# input structure: _c0 (main chunk) +# e.g. 998C4DF2FC7389A7C82A9600000000512040C070 +# Required Fields: +# - _c0 (main chunk) df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").load(files) +# rename the column _c0 (column 1) to ringkey df = df.withColumnRenamed("_c0","ringkey") + +# repartition the dataframe to the number of partitions (executors * cores) df = df.repartition(PARTITIONS) + +# map the deletekey function to the dataframe: blindly delete keys on the RING. rdd = df.rdd.map(deletekey).toDF() + deletedorphans = "%s://%s/%s/s3fsck/deleted-s3-orphans.csv" % (PROTOCOL, PATH, RING) +# write the dataframe to a csv file with the results of the deletekey function rdd.write.format("csv").mode("overwrite").options(header="false").save(deletedorphans)