Skip to content

Commit

Permalink
Fix cron jobs on k8s (google#3281)
Browse files Browse the repository at this point in the history
  • Loading branch information
hogo6002 authored Aug 15, 2023
1 parent acd736d commit e0d397e
Show file tree
Hide file tree
Showing 16 changed files with 347 additions and 15 deletions.
Empty file added isort
Empty file.
3 changes: 1 addition & 2 deletions src/clusterfuzz/_internal/bot/tasks/cron/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ def main():

app_id = utils.get_application_id()
timestamp = datetime.datetime.utcnow().strftime('%Y-%m-%d-%H:%M:%S')
output_url_prefix = (
f'gs://testing-{backup_bucket}/datastore-backups/{timestamp}')
output_url_prefix = (f'gs://{backup_bucket}/datastore-backups/{timestamp}')

body = {
'output_url_prefix': output_url_prefix,
Expand Down
1 change: 1 addition & 0 deletions src/clusterfuzz/_internal/bot/tasks/cron/corpus_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,5 @@ def main():
corpus_backup_bucket_name)
except:
logs.log_error(f'Failed to make {target} corpus backup public.')
logs.log('Corpus backup succeeded.')
return True
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 Google LLC
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 Google LLC
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -394,5 +394,5 @@ def main():
update_job_weights()

store_current_weights_in_bigquery()
logs.log_error(f'Fuzzer and job weights succeeded. {str(e)}')
logs.log(f'Fuzzer and job weights succeeded.')
return True
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 Google LLC
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
13 changes: 13 additions & 0 deletions src/clusterfuzz/_internal/bot/tasks/cron/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
318 changes: 318 additions & 0 deletions src/clusterfuzz/_internal/bot/tasks/cron/helpers/bot_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Library to manage bots using GCE instance templates and groups."""

import time

import google_auth_httplib2
import googleapiclient
from googleapiclient.discovery import build
import httplib2

from clusterfuzz._internal.base import retry
from clusterfuzz._internal.google_cloud_utils import credentials

RETRY_COUNT = 8
RETRY_DELAY = 4

REQUEST_TIMEOUT = 180

_SCOPES = [
'https://www.googleapis.com/auth/cloud-platform',
]


class BotManagerError(Exception):
"""Base exception class."""


class OperationError(BotManagerError):
"""Errors during an operation."""


class RequestError(BotManagerError):
"""Errors during a request."""


class NotFoundError(RequestError):
"""Not found."""


class AlreadyExistsError(RequestError):
"""Already exists."""


class RetryableError(RequestError):
"""Retryable request error."""


class BotManager:
"""Manager for bots."""

def __init__(self, project_id, zone):
self.project_id = project_id
self.zone = zone

creds = credentials.get_default(scopes=_SCOPES)[0]
http = google_auth_httplib2.AuthorizedHttp(
creds, http=httplib2.Http(timeout=REQUEST_TIMEOUT))

self.compute = build('compute', 'v1', http=http, cache_discovery=False)

def instance_group(self, name):
"""Get an InstanceGroup resource with the given name."""
return InstanceGroup(name, self)

def instance_template(self, name):
"""Get an InstanceTemplate resource with the given name."""
return InstanceTemplate(name, self)


class Resource:
"""Represents a resource."""

_OPERATION_POLL_SECONDS = 5

def __init__(self, name, manager):
self.name = name
self.manager = manager

@property
def compute(self):
return self.manager.compute

@property
def project_id(self):
return self.manager.project_id

@property
def zone(self):
return self.manager.zone

def get(self):
raise NotImplementedError

def exists(self):
"""Return whether or not the resource exists."""
try:
self.get()
return True
except NotFoundError:
return False

def _wait_for_operation(self, operation):
"""Wait for an operation to complete."""
while True:
if operation['status'] == 'DONE':
if 'error' in operation:
raise OperationError(operation['error'])

return operation

time.sleep(self._OPERATION_POLL_SECONDS)

if 'zone' in operation:
operation = self.compute.zoneOperations().get(
project=self.project_id,
zone=self.zone,
operation=operation['name']).execute()
else:
operation = self.compute.globalOperations().get(
project=self.project_id, operation=operation['name']).execute()

def _identity(self, response):
"""Identify function for convenience."""
return response

@retry.wrap(
RETRY_COUNT,
RETRY_DELAY,
'handlers.cron.helpers.bot_manager.Resource.execute',
exception_types=[RetryableError])
def execute(self, request, result_proc=None):
"""Execute a request."""
if result_proc is None:
result_proc = self._wait_for_operation

try:
response = request.execute()
except googleapiclient.errors.HttpError as e:
if e.resp.status in [400, 403, 500, 503]:
raise RetryableError(str(e))
if e.resp.status == 404:
raise NotFoundError(str(e))
if e.resp.status == 409:
raise AlreadyExistsError(str(e))

raise RequestError(str(e))

return result_proc(response)


class InstanceGroup(Resource):
"""Instance group."""

# At least 80% of the instances should've been created. Some errors may be
# expected because of limited resources in the zone.
MIN_INSTANCES_RATIO = 0.8
MAX_ERROR_RATIO = 1.0 - MIN_INSTANCES_RATIO

def _wait_for_instances(self):
"""Wait for instance actions to complete."""
while True:
num_instances = 0
instances_ready = 0
errors = []

for instance in self.list_managed_instances():
num_instances += 1

if instance['currentAction'] == 'NONE':
instances_ready += 1
elif 'lastAttempt' in instance and 'errors' in instance['lastAttempt']:
errors.append(instance['lastAttempt']['errors'])

if instances_ready >= max(1, num_instances * self.MIN_INSTANCES_RATIO):
return

if len(errors) > num_instances * self.MAX_ERROR_RATIO:
raise OperationError(errors)

time.sleep(1)

def _handle_size_change(self, response):
"""Response handler for operations that change instances."""
self._wait_for_operation(response)
self._wait_for_instances()

def get(self):
"""Get an instance group for a cluster."""
return self.execute(
self.compute.instanceGroupManagers().get(
project=self.project_id,
zone=self.zone,
instanceGroupManager=self.name),
result_proc=self._identity)

def list_managed_instances(self, instance_filter=None):
"""List managed instances in the group."""
next_page_token = None

while True:
response = self.execute(
self.compute.instanceGroupManagers().listManagedInstances(
project=self.project_id,
zone=self.zone,
instanceGroupManager=self.name,
pageToken=next_page_token,
filter=instance_filter),
result_proc=self._identity)

for instance in response['managedInstances']:
if instance['currentAction'] != 'DELETING':
# Instances can be stuck in DELETING, don't include them.
yield instance

if 'nextPageToken' in response:
next_page_token = response['nextPageToken']
else:
break

def create(self,
base_instance_name,
instance_template,
size=0,
auto_healing_policy=None,
wait_for_instances=True):
"""Create this instance group."""
manager_body = {
'baseInstanceName': base_instance_name,
'instanceTemplate': 'global/instanceTemplates/' + instance_template,
'name': self.name,
'targetSize': size,
}
if auto_healing_policy:
manager_body['autoHealingPolicies'] = [auto_healing_policy]

result_proc = None
if wait_for_instances:
result_proc = self._handle_size_change

self.execute(
self.compute.instanceGroupManagers().insert(
project=self.project_id, zone=self.zone, body=manager_body),
result_proc=result_proc)

def patch_auto_healing_policies(self,
auto_healing_policy=None,
wait_for_instances=True):
"""Update the health check url of this instance group."""
if auto_healing_policy:
request_body = {'autoHealingPolicies': [auto_healing_policy]}
else:
request_body = {'autoHealingPolicies': []}

result_proc = None
if wait_for_instances:
result_proc = self._handle_size_change

self.execute(
self.compute.instanceGroupManagers().patch(
project=self.project_id,
zone=self.zone,
instanceGroupManager=self.name,
body=request_body),
result_proc=result_proc)

def resize(self, new_size, wait_for_instances=True):
"""Resize this instance group."""
result_proc = None
if wait_for_instances:
result_proc = self._handle_size_change

self.execute(
self.compute.instanceGroupManagers().resize(
project=self.project_id,
zone=self.zone,
instanceGroupManager=self.name,
size=new_size),
result_proc=result_proc)

def delete(self):
"""Delete this instance group."""
self.execute(self.compute.instanceGroupManagers().delete(
project=self.project_id, zone=self.zone,
instanceGroupManager=self.name))


class InstanceTemplate(Resource):
"""Instance template."""

def get(self):
"""Get the instance template."""
return self.execute(
self.compute.instanceTemplates().get(
instanceTemplate=self.name, project=self.project_id),
result_proc=self._identity)

def delete(self):
"""Delete the instance template."""
self.execute(self.compute.instanceTemplates().delete(
instanceTemplate=self.name, project=self.project_id))

def create(self, template_body):
"""Create the instance template."""
template_body['name'] = self.name
self.execute(self.compute.instanceTemplates().insert(
project=self.project_id, body=template_body))
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 Google LLC
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -209,4 +209,5 @@ def main():
thread_pool.submit(_load_data, fuzzer.name)

thread_pool.shutdown(wait=True)
logs.log('Load big query task finished successfully.')
return True
Loading

0 comments on commit e0d397e

Please sign in to comment.