Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tasks to extract and upload files (Salesforce Files) #3800

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cumulusci/cumulusci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,14 @@ tasks:
class_path: cumulusci.tasks.vlocity.vlocity.VlocityDeployTask
description: "Executes the `vlocity packDeploy` command against an org"
group: OmniStudio
retrieve_files:
description: Retrieve documents that have been uploaded to a library in Salesforce CRM Content or Salesforce Files.
class_path: cumulusci.tasks.salesforce.salesforce_files.RetrieveFiles
group: Salesforce Metadata
upload_files:
description: Upload documents (files) to a Salesforce org.
class_path: cumulusci.tasks.salesforce.salesforce_files.UploadFiles
group: Salesforce Metadata
flows:
ci_beta:
group: Continuous Integration
Expand Down
182 changes: 181 additions & 1 deletion cumulusci/tasks/salesforce/salesforce_files.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from cumulusci.tasks.salesforce import BaseSalesforceApiTask

import requests
import os
import json

class ListFiles(BaseSalesforceApiTask):
task_docs = """
Expand Down Expand Up @@ -28,3 +30,181 @@ def _run_task(self):
)

return self.return_values

class RetrieveFiles(BaseSalesforceApiTask):
#task.org_config
task_docs = """
This task downloads all the documents (files) that have been uploaded to a library in Salesforce CRM Content or Salesforce Files.
Use the task display_files in order to view the files that are available to download.
"""
task_options = {
"output_directory": {
"description": "The directory where the files will be saved. By default, files will be saved in Downloads",
"required": False,
},
"file_id_list": {
"description": "Specify a comma-separated list of Ids files to download. All the availables files are downloaded by default. Use display_files task to view files and their Ids",
"required": False,
},
}

def _init_options(self, kwargs):
super(RetrieveFiles, self)._init_options(kwargs)

if "output_directory" not in self.options:
self.options["output_directory"] = "Files"

if "file_id_list" not in self.options:
self.options["file_id_list"] = ""

self.return_values = []

def _run_task(self):
self.logger.info(f"Retrieving files from the specified org..")
output_directory = self.options["output_directory"]
self.logger.info(f"Output directory: {output_directory}")

query_condition = ''

file_id_list = self.options["file_id_list"]

if file_id_list: # If the list of Ids of files to be downloaded is specify, fetch only those files.
items_list = [f"'{item.strip()}'" for item in file_id_list.split(",")]
query_condition = f"AND ContentDocumentId IN ({','.join(items_list)})"

available_files = [
{
"Id": result["Id"],
"FileName": result["Title"],
"FileType": result["FileType"],
"VersionData": result["VersionData"],
"ContentDocumentId": result["ContentDocumentId"]
}
for result in self.sf.query(
f'SELECT Title, Id, FileType, VersionData, ContentDocumentId FROM ContentVersion WHERE isLatest=true {query_condition}'
)["records"]
]

self.logger.info(f"Found {len(available_files)} files in the org.\n")
self.logger.info(f'Files will be downloaded in the directory: {self.options["output_directory"]} \n' )

for current_file in available_files:
versionData = current_file["VersionData"]
url = f"{self.org_config.instance_url}/{versionData}"
headers = {"Authorization": f"Bearer {self.org_config.access_token}"}

response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()

file_extension = current_file["FileType"].lower()
local_filename = f"{current_file['FileName']}.{file_extension}"
local_filename = os.path.join(output_directory, local_filename)

self.logger.info(f"Downloading: {current_file['FileName']}")

file_exists = os.path.exists(local_filename)

if file_exists:
file_name = current_file['FileName']
self.logger.info(f'A file with the name {file_name} already exists. in the directory. This file will be renamed.')
if file_exists:
count = 1
while True:
local_filename = os.path.join(output_directory, f"{current_file['FileName']} ({count}).{file_extension}")
if not os.path.exists(local_filename):
break
count+=1

os.makedirs(os.path.dirname(local_filename), exist_ok=True) # Create the folder if it doesn't exist

with open(local_filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)

self.logger.info('\n')

self.return_values = available_files
return self.return_values

class UploadFiles(BaseSalesforceApiTask):
task_docs = """
This task uploads files to a Salesforce org.
"""
task_options = {
"path": {
"description": "The directory to upload files from. By default, files under 'Files' folder are uploaded.",
"required": False,
},
"file_list": {
"description": "Specify a comma-separated list of files to upload. All the files in the specified directory are uploaded by default.",
"required": False,
},
}

def _init_options(self, kwargs):
super(UploadFiles, self)._init_options(kwargs)

if "path" not in self.options:
self.options["path"] = "Files"

if "file_list" not in self.options:
self.options["file_list"] = ""

self.return_values = []

def _run_task(self):
path = self.options["path"]
file_list = self.options["file_list"]

# Salesforce REST API endpoint for uploading files
api_version = self.project_config.project__package__api_version
url = f"{self.org_config.instance_url}/services/data/v{api_version}/sobjects/ContentVersion/"

# Prepare the request headers
headers = {
"Authorization": f"Bearer {self.org_config.access_token}",
}

if file_list:
files_to_upload = file_list.split(',')
else:
files_to_upload = os.listdir(path)

for filename in files_to_upload:
file_path = os.path.join(path, filename.strip())

if os.path.isfile(file_path):
with open(file_path, 'rb') as file:
# Construct the payload for the entity content
title = os.path.splitext(os.path.basename(file_path))[0] # File name

entity_content = {
'Title': title,
'PathOnClient': file_path,
}

self.return_values.append(entity_content)

files = {
'entity_content': ('', json.dumps(entity_content), 'application/json'),
'VersionData': (filename, file, 'application/octet-stream')
}

try:
response = requests.post(url, headers=headers, files=files)
response.raise_for_status() # Raise an exception for HTTP errors

# Parse the response JSON
response_json = response.json()

if response.status_code == 201: # Upload successful
content_version_id = response_json["id"]
self.logger.info(f"File '{filename}' uploaded successfully. ContentVersion Id: {content_version_id}")
else:
self.logger.error(f"Failed to upload file '{filename}': {response_json}")
except requests.RequestException as e:
self.logger.error(f"Error uploading file '{filename}': {e}")
self.logger.error(e.response.content) # Print response content in case of error

return self.return_values # Returns a list containing all the files uplaoded.
147 changes: 144 additions & 3 deletions cumulusci/tasks/salesforce/tests/test_salesforce_files.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from unittest.mock import Mock

from cumulusci.tasks.salesforce.salesforce_files import ListFiles
import unittest
from unittest.mock import Mock, patch, call, mock_open
import os
import requests
import json
from cumulusci.tasks.salesforce.salesforce_files import ListFiles, UploadFiles, RetrieveFiles
from cumulusci.tasks.salesforce.tests.util import create_task


Expand All @@ -24,3 +27,141 @@ def test_display_files(self):
{"Id": "0PS000000000000", "FileName": "TEST1", "FileType": "TXT"},
{"Id": "0PS000000000001", "FileName": "TEST2", "FileType": "TXT"},
]

class TestRetrieveFiles(unittest.TestCase):

@patch('requests.get')
@patch('os.path.exists')
@patch('os.makedirs')
@patch('builtins.open')
def test_run_task(self, mock_open, mock_makedirs, mock_exists, mock_get):
# Mock Salesforce query response
mock_sf = Mock()
mock_sf.query.return_value = {
"totalSize": 2,
"records": [
{
"Title": "TEST1",
"Id": "0PS000000000000",
"FileType": "TXT",
"VersionData": "version1",
"ContentDocumentId": "doc1"
},
{
"Title": "TEST2",
"Id": "0PS000000000001",
"FileType": "TXT",
"VersionData": "version2",
"ContentDocumentId": "doc2"
}
]
}

# Mock org config
mock_org_config = Mock()
mock_org_config.instance_url = 'https://test.salesforce.com'
mock_org_config.access_token = 'testtoken'

# Create task with mocked Salesforce and org config
task = create_task(RetrieveFiles, {"output_directory": "test_dir", "file_id_list": ""})
task.sf = mock_sf
task.org_config = mock_org_config

# Mock file existence and request response
mock_exists.return_value = False
mock_response = Mock()
mock_response.iter_content.return_value = [b'chunk1', b'chunk2']
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response

# Run the task
task._run_task()

# Check if query was called with correct SOQL
mock_sf.query.assert_called_once_with(
'SELECT Title, Id, FileType, VersionData, ContentDocumentId FROM ContentVersion WHERE isLatest=true '
)

# Check if files are downloaded
expected_calls = [
call('https://test.salesforce.com/version1', headers={'Authorization': 'Bearer testtoken'}, stream=True),
call('https://test.salesforce.com/version2', headers={'Authorization': 'Bearer testtoken'}, stream=True)
]
mock_get.assert_has_calls(expected_calls, any_order=True)

# Check if files are written correctly
mock_open.assert_any_call(os.path.join('test_dir', 'TEST1.txt'), 'wb')
mock_open.assert_any_call(os.path.join('test_dir', 'TEST2.txt'), 'wb')

# Check if return values are set correctly
self.assertEqual(task.return_values, [
{"Id": "0PS000000000000", "FileName": "TEST1", "FileType": "TXT", "VersionData": "version1", "ContentDocumentId": "doc1"},
{"Id": "0PS000000000001", "FileName": "TEST2", "FileType": "TXT", "VersionData": "version2", "ContentDocumentId": "doc2"}
])


class TestUploadFiles(unittest.TestCase):

@patch('requests.post')
@patch('os.listdir')
@patch('os.path.isfile')
@patch('builtins.open', new_callable=mock_open, read_data=b'test data')
def test_run_task(self, mock_open, mock_isfile, mock_listdir, mock_post):
# Mock org config and project config
mock_org_config = Mock()
mock_org_config.instance_url = 'https://test.salesforce.com'
mock_org_config.access_token = 'testtoken'

mock_project_config = Mock()
mock_project_config.project__package__api_version = '50.0'

# Create task with mocked configs
task = create_task(UploadFiles, {"path": "test_dir", "file_list": ""})
task.org_config = mock_org_config
task.project_config = mock_project_config

# Mock file discovery
mock_listdir.return_value = ['file1.txt', 'file2.txt']
mock_isfile.side_effect = lambda filepath: filepath in [
os.path.join('test_dir', 'file1.txt'),
os.path.join('test_dir', 'file2.txt')
]

# Mock requests response
mock_response = Mock()
mock_response.status_code = 201
mock_response.json.return_value = {"id": "contentversionid"}
mock_post.return_value = mock_response

# Run the task
task._run_task()

mock_open.assert_any_call(os.path.join('test_dir', 'file1.txt'), 'rb')
mock_open.assert_any_call(os.path.join('test_dir', 'file2.txt'), 'rb')

# Check if requests.post was called correctly
expected_calls = [
call(
'https://test.salesforce.com/services/data/v50.0/sobjects/ContentVersion/',
headers={'Authorization': 'Bearer testtoken'},
files={
'entity_content': ('', json.dumps({'Title': 'file1', 'PathOnClient': os.path.join('test_dir', 'file1.txt')}), 'application/json'),
'VersionData': ('file1.txt', mock_open(), 'application/octet-stream')
}
),
call(
'https://test.salesforce.com/services/data/v50.0/sobjects/ContentVersion/',
headers={'Authorization': 'Bearer testtoken'},
files={
'entity_content': ('', json.dumps({'Title': 'file2', 'PathOnClient': os.path.join('test_dir', 'file2.txt')}), 'application/json'),
'VersionData': ('file2.txt', mock_open(), 'application/octet-stream')
}
)
]

self.assertEqual(task.return_values, [
{'Title': 'file1', 'PathOnClient': os.path.join('test_dir', 'file1.txt')},
{'Title': 'file2', 'PathOnClient': os.path.join('test_dir', 'file2.txt')}
])

mock_post.assert_has_calls(expected_calls, any_order=True)
Loading