Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
lorinkoz committed Feb 22, 2021
1 parent b07fad5 commit fbf0efa
Show file tree
Hide file tree
Showing 14 changed files with 175 additions and 127 deletions.
68 changes: 47 additions & 21 deletions django_pgschemas/management/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.db.models import CharField, Q
from django.db.models import Value as V
from django.db.models import CharField, Q, Value as V
from django.db.models.functions import Concat
from django.db.utils import ProgrammingError

Expand Down Expand Up @@ -66,6 +65,9 @@ def add_arguments(self, parser):
dest="excluded_schemas",
help="Schema(s) to exclude when executing the current command",
)
parser.add_argument(
"--sdb", nargs="?", dest="schema_database", default="default", help="Database to operate with the schema(s)"
)
parser.add_argument(
"--parallel",
dest="parallel",
Expand All @@ -80,6 +82,7 @@ def add_arguments(self, parser):
)

def get_schemas_from_options(self, **options):
database = options.get("database") or options.get("schema_database")
skip_schema_creation = options.get("skip_schema_creation", False)
try:
schemas = self._get_schemas_from_options(**options)
Expand All @@ -96,7 +99,7 @@ def get_schemas_from_options(self, **options):
raise CommandError("This command can only run in %s" % self.specific_schemas)
if not skip_schema_creation:
for schema in schemas:
create_schema(schema, check_if_exists=True, sync_schema=False, verbosity=0)
create_schema(schema, database, check_if_exists=True, sync_schema=False, verbosity=0)
return schemas

def get_executor_from_options(self, **options):
Expand All @@ -106,6 +109,7 @@ def get_scope_display(self):
return "|".join(self.specific_schemas or []) or self.scope

def _get_schemas_from_options(self, **options):
database = options.get("database") or options.get("schema_database")
schemas = options.get("schemas") or []
excluded_schemas = options.get("excluded_schemas") or []
include_all_schemas = options.get("all_schemas") or False
Expand Down Expand Up @@ -139,9 +143,19 @@ def _get_schemas_from_options(self, **options):
raise CommandError("No schema provided")

TenantModel = get_tenant_model()
static_schemas = [x for x in settings.TENANTS.keys() if x != "default"] if allow_static else []
static_schemas = (
[
x
for x in settings.TENANTS.keys()
if x != "default" and database in (settings.TENANTS[x].get("DATABASES") or ["default"])
]
if allow_static
else []
)
dynamic_schemas = (
TenantModel.objects.values_list("schema_name", flat=True) if dynamic_ready and allow_dynamic else []
[x.schema_name for x in TenantModel.objects.all() if x.get_database() == database]
if dynamic_ready and allow_dynamic
else []
)
if clone_reference and allow_static:
static_schemas.append(clone_reference)
Expand Down Expand Up @@ -172,8 +186,10 @@ def _get_schemas_from_options(self, **options):
schemas_to_return.add(schema)
elif schema == clone_reference:
schemas_to_return.add(schema)
elif dynamic_ready and TenantModel.objects.filter(schema_name=schema).exists() and allow_dynamic:
schemas_to_return.add(schema)
elif dynamic_ready and allow_dynamic:
tenant = TenantModel.objects.filter(schema_name=schema).first()
if tenant and tenant.get_database() == database:
schemas_to_return.add(schema)

schemas = list(set(schemas) - schemas_to_return)

Expand All @@ -187,13 +203,18 @@ def _get_schemas_from_options(self, **options):
and any([x for x in data["DOMAINS"] if x.startswith(schema)])
]
if dynamic_ready and allow_dynamic:
local += (
TenantModel.objects.annotate(
route=Concat("domains__domain", V("/"), "domains__folder", output_field=CharField())
)
.filter(Q(schema_name=schema) | Q(domains__domain__istartswith=schema) | Q(route=schema))
.distinct()
.values_list("schema_name", flat=True)
local += list(
{
x.schema_name
for x in (
TenantModel.objects.annotate(
route=Concat("domains__domain", V("/"), "domains__folder", output_field=CharField())
)
.filter(Q(schema_name=schema) | Q(domains__domain__istartswith=schema) | Q(route=schema))
.distinct()
)
if x.get_database() == database
}
)
if not local:
raise CommandError("No schema found for '%s'" % schema)
Expand All @@ -215,13 +236,18 @@ def _get_schemas_from_options(self, **options):
if schema_name not in ["public", "default", clone_reference]
and any([x for x in data["DOMAINS"] if x.startswith(schema)])
]
local += (
TenantModel.objects.annotate(
route=Concat("domains__domain", V("/"), "domains__folder", output_field=CharField())
)
.filter(Q(schema_name=schema) | Q(domains__domain__istartswith=schema) | Q(route=schema))
.distinct()
.values_list("schema_name", flat=True)
local += list(
{
x.schema_name
for x in (
TenantModel.objects.annotate(
route=Concat("domains__domain", V("/"), "domains__folder", output_field=CharField())
)
.filter(Q(schema_name=schema) | Q(domains__domain__istartswith=schema) | Q(route=schema))
.distinct()
)
if x.get_database() == database
}
)
if not local:
raise CommandError("No schema found for '%s' (excluded)" % schema)
Expand Down
5 changes: 4 additions & 1 deletion django_pgschemas/management/commands/cloneschema.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ def add_arguments(self, parser):
super().add_arguments(parser)
parser.add_argument("source", help="The name of the schema you want to clone")
parser.add_argument("destination", help="The name of the schema you want to create as clone")
parser.add_argument(
"--sdb", nargs="?", dest="schema_database", default="default", help="Database to operate with the schema(s)"
)
parser.add_argument(
"--noinput",
"--no-input",
Expand Down Expand Up @@ -104,7 +107,7 @@ def handle(self, *args, **options):
if TenantModel.objects.filter(schema_name=options["source"]).exists():
tenant, domain = self.get_dynamic_tenant(**options)
try:
clone_schema(options["source"], options["destination"], dry_run)
clone_schema(options["source"], options["destination"], options["schema_database"], dry_run)
if tenant and domain:
if options["verbosity"] >= 1:
self.stdout.write("Schema cloned.")
Expand Down
31 changes: 18 additions & 13 deletions django_pgschemas/management/commands/createrefschema.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from django.conf import settings
from django.core.checks import Tags, run_checks
from django.core.management.base import BaseCommand, CommandError

Expand All @@ -20,18 +21,22 @@ def handle(self, *args, **options):
clone_reference = get_clone_reference()
if not clone_reference:
raise CommandError("There is no reference schema configured.")
if options.get("recreate", False):
drop_schema(clone_reference, check_if_exists=True, verbosity=options["verbosity"])
for database in settings.DATABASES:
if options.get("recreate", False):
drop_schema(clone_reference, database=database, check_if_exists=True, verbosity=options["verbosity"])
if options["verbosity"] >= 1:
self.stdout.write(f"[{database}] Destroyed existing reference schema.")
created = create_schema(
clone_reference, database=database, check_if_exists=True, verbosity=options["verbosity"]
)
if options["verbosity"] >= 1:
self.stdout.write("Destroyed existing reference schema.")
created = create_schema(clone_reference, check_if_exists=True, verbosity=options["verbosity"])
if options["verbosity"] >= 1:
if created:
self.stdout.write("Reference schema successfully created!")
else:
self.stdout.write("Reference schema already exists.")
self.stdout.write(
self.style.WARNING(
"Run this command again with --recreate if you want to recreate the reference schema."
if created:
self.stdout.write(f"[{database}] Reference schema successfully created!")
else:
self.stdout.write(f"[{database}] Reference schema already exists.")
self.stdout.write(
self.style.WARNING(
f"[{database}] Run this command again with --recreate if you want to "
"recreate the reference schema."
)
)
)
1 change: 1 addition & 0 deletions django_pgschemas/management/commands/runschema.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def handle(self, *args, **options):
options.pop("static_schemas")
options.pop("dynamic_schemas")
options.pop("tenant_schemas")
options.pop("schema_database")
options.pop("parallel")
options.pop("skip_schema_creation")
if self.allow_interactive:
Expand Down
8 changes: 5 additions & 3 deletions django_pgschemas/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def save(self, verbosity=1, *args, **kwargs):
elif is_new:
# Although we are not using the schema functions directly, the signal might be registered by a listener
schema_needs_sync.send(sender=TenantMixin, tenant=self.serializable_fields())
elif not is_new and self.auto_create_schema and not schema_exists(self.schema_name):
elif not is_new and self.auto_create_schema and not schema_exists(self.schema_name, self.get_database()):
# Create schemas for existing models, deleting only the schema on failure
try:
self.create_schema(verbosity=verbosity)
Expand Down Expand Up @@ -81,13 +81,15 @@ def create_schema(self, sync_schema=True, verbosity=1):
"""
Creates or clones the schema ``schema_name`` for this tenant.
"""
return create_or_clone_schema(self.schema_name, sync_schema, verbosity)
return create_or_clone_schema(
self.schema_name, database=self.get_database(), sync_schema=sync_schema, verbosity=verbosity
)

def drop_schema(self):
"""
Drops the schema.
"""
return drop_schema(self.schema_name)
return drop_schema(self.schema_name, database=self.get_database())

def get_primary_domain(self):
try:
Expand Down
23 changes: 19 additions & 4 deletions django_pgschemas/routers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,44 @@
from django.conf import settings

from .schema import schema_handler
from .utils import get_tenant_database_alias
from .utils import get_clone_reference


class SyncRouter(object):
"""
A router to control which applications will be synced depending on the schema we're syncing.
It also controls database for read/write in a tenant sharding configuration.
"""

def app_in_list(self, app_label, app_list):
app_config = apps.get_app_config(app_label)
app_config_full_name = "{}.{}".format(app_config.__module__, app_config.__class__.__name__)
return (app_config.name in app_list) or (app_config_full_name in app_list)

def db_for_read(self, model, **hints):
if not schema_handler.active or schema_handler.active.schema_name in ["public", get_clone_reference()]:
return None
return schema_handler.active.get_database()

def db_for_write(self, model, **hints):
if not schema_handler.active or schema_handler.active.schema_name in ["public", get_clone_reference()]:
return None
return schema_handler.active.get_database()

def allow_migrate(self, db, app_label, model_name=None, **hints):
if db != get_tenant_database_alias() or not schema_handler.active:
if not schema_handler.active:
return False
app_list = []
databases = []
if schema_handler.active.schema_name == "public":
app_list = settings.TENANTS["public"]["APPS"]
databases = settings.TENANTS["public"].get("DATABASES") or ["default"]
elif schema_handler.active.schema_name in settings.TENANTS:
app_list = settings.TENANTS[schema_handler.active.schema_name]["APPS"]
databases = settings.TENANTS[schema_handler.active.schema_name].get("DATABASES") or ["default"]
else:
app_list = settings.TENANTS["default"]["APPS"]
if not app_list:
databases = settings.TENANTS["default"].get("DATABASES") or ["default"]
if not app_list or not databases:
return None
return self.app_in_list(app_label, app_list)
return db in databases and self.app_in_list(app_label, app_list)
10 changes: 10 additions & 0 deletions django_pgschemas/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ def set_schema(self, schema_descriptor):
"""
Main API method to set current schema.
"""
from django.contrib.contenttypes.models import ContentType

assert isinstance(
schema_descriptor, SchemaDescriptor
), "'set_schema' must be called with a SchemaDescriptor descendant"

schema_descriptor.ready = False # Defines whether search path has been set
ContentType.objects.clear_cache() # Attempting to catch change of database
self.set_active_schema(schema_descriptor)

def set_schema_to(self, schema_name, domain_url=None, folder=None):
Expand Down Expand Up @@ -93,3 +97,9 @@ def get_primary_domain(self):
if self.domain_url:
return "/".join([self.domain_url, self.folder]) if self.folder else self.domain_url
return None

def get_database(self):
"""
Returns the database to use for this schema.
"""
return "default"
2 changes: 1 addition & 1 deletion django_pgschemas/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@
def tenant_delete_callback(sender, instance, **kwargs):
if not isinstance(instance, get_tenant_model()):
return
if instance.auto_drop_schema and schema_exists(instance.schema_name):
if instance.auto_drop_schema and schema_exists(instance.schema_name, instance.get_database()):
schema_pre_drop.send(sender=get_tenant_model(), tenant=instance.serializable_fields())
instance.drop_schema()
Loading

0 comments on commit fbf0efa

Please sign in to comment.