-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupdate_records.py
107 lines (78 loc) · 2.79 KB
/
update_records.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import logging
import pprint
import simple_salesforce.exceptions as sf_exceptions
import sys
import uuid
from simple_salesforce import Salesforce, format_soql
from tqdm import tqdm
from utils import salesforce_login as login
logging.basicConfig(format="%(asctime)s : %(message)s", level=logging.ERROR)
#
# sf_credentials = (
# "<instance_url>",
# "<session_id>",
# )
#
sf_credentials = (
"",
"",
)
custom_field_name = "Archive_Id"
def update(sf: Salesforce):
custom_objects = []
s_out = []
s_err = []
row_set = []
describe_response = sf.describe()
for sobj in describe_response["sobjects"]:
if sobj["custom"] and sobj["name"][-3:] == "__c":
custom_objects.append({"name": sobj["name"]})
if custom_objects:
pbar = tqdm(total=len(custom_objects), desc="sf query", ncols=100)
for sobj in custom_objects:
pbar.update(1)
try:
query_str = format_soql(
f"SELECT Id, Name, {custom_field_name}__c FROM {sobj['name']}"
)
query_result = sf.query(query_str)
if query_result["totalSize"] > 0:
data = []
for record in query_result["records"]:
guid = str(uuid.uuid5(uuid.NAMESPACE_DNS, record["Id"]))
data.append(
{"Id": record["Id"], f"{custom_field_name}__c": guid}
)
row_set.append({"sobject": sobj["name"], "data": data})
except sf_exceptions.SalesforceMalformedRequest as e:
s_err.append({"sobject": sobj["name"], "error": e})
# raise
pbar.close()
if row_set:
pbar = tqdm(total=len(row_set), desc="sf update", ncols=100)
for row in row_set:
pbar.update(1)
try:
results = sf.bulk.__getattr__(row["sobject"]).update(
row["data"], batch_size=10000
)
s_out.append({"sobject": row["sobject"], "results": results})
except Exception as e:
s_err.append(
{"sobject": row["sobject"], "error": e, "data": row["data"]}
)
pbar.close()
print("\n*** Processing completed ***\n")
for msg in s_out:
pretty = pprint.pformat(msg["results"], indent=2, width=80)
logging.info(f"sObject: {msg['sobject']} ~\n{pretty}")
print("\n*** Errors ***\n")
for message in s_err:
logging.error(f"ERROR ~ {str(message['error']).strip()}")
if __name__ == "__main__":
sf = login(sf_credentials)
if not sf:
print("\n*** Not Logged into Salesforce ***\n")
sys.exit(1)
print("\n*** Logged into Salesforce ***\n")
update(sf)