Skip to content

Commit

Permalink
changes from 202306
Browse files Browse the repository at this point in the history
  • Loading branch information
André Tayer committed Dec 27, 2023
1 parent 6a01b8b commit d90efc9
Show file tree
Hide file tree
Showing 19 changed files with 142 additions and 100 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ db-create-tables: up
db-setup: up
@echo "SETUP"
@echo "sleeping 40 seconds in order to postgres start-up"
@sleep 40
@echo "Creating db"
@docker-compose run app python -c "from src.db_models.utils import create_db, create_or_drop_all_tables; create_db();create_or_drop_all_tables(cmd='create')"
@echo ""
Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

Esse repositório consiste na Extração, Transformação e Carregamento (ETL) dos dados públicos dos CNPJ's de todas as ~60
milhões de empresas do Brasil disponibilizadas pela Receita Federal
nesse [link](https://dados.gov.br/dados/conjuntos-dados/cadastro-nacional-da-pessoa-juridica-cnpj)
nesse [link](https://dados.gov.br/dados/conjuntos-dados/cadastro-nacional-da-pessoa-juridica---cnpj)
para um banco relacional ([postgres](https://www.postgresql.org/)) utilizando Docker.

## **Sumário**
Expand Down Expand Up @@ -47,7 +47,7 @@ Para o `regime tributário` ver esse [pdf](docs/layout-regime-tributario.pdf)

Além disso existem ainda outros arquivos que mapeiam algumas informações de cada `.csv` tal como o código da natureza
jurídica para seu nome (`2046 -> Sociedade Anônima Aberta`) (esses arquivos também estão presentes ao final da pagina
do [link](https://dados.gov.br/dados/conjuntos-dados/cadastro-nacional-da-pessoa-juridica-cnpj))
do [link](https://dados.gov.br/dados/conjuntos-dados/cadastro-nacional-da-pessoa-juridica---cnpj))
.

**Os dados são atualizados mensalmente**. Para realizar a atualização dos dados veja a seção de `UPDATE`.
Expand Down Expand Up @@ -136,7 +136,7 @@ Por default os nomes da tabela serão esses (mais detalhes no arquivo [settings.
> configurou conforme mostrado acima: <br>
> host: localhost <br>
> database: rf_dados_publicos_cnpj <br>
> porta: 5433 (ver docker-compose.yaml) <br>
> porta: 5434 (ver docker-compose.yaml) <br>
> usuário: postgres <br>
> senha: postgres
Expand All @@ -159,7 +159,7 @@ $ make db-setup
```

6. Execute para fazer o **_download_** e **_unzip_** dos arquivos
do [link (recursos)](https://dados.gov.br/dados/conjuntos-dados/cadastro-nacional-da-pessoa-juridica-cnpj):
do [link (recursos)](https://dados.gov.br/dados/conjuntos-dados/cadastro-nacional-da-pessoa-juridica---cnpj):

```terminal
$ make io-download-and-unzip
Expand Down Expand Up @@ -225,11 +225,11 @@ uptime em produção é:
3. fazer a carga dos arquivos (step 7 -> 12 de `Setup & Launch`);
4. renomear as tabelas antigas para `'_old'` (via _sql_);
4. renomear as tabelas antigas para `'_old'` (via _sql_) (`$ ALTER TABLE rf_company RENAME TO rf_company_old;` ...);
5. retirar o sufixo `'_new'` das tabelas novas (via _sql_);
5. retirar o sufixo `'_new'` das tabelas novas (via _sql_); (`$ ALTER TABLE rf_company_new RENAME TO rf_company;` ...);
6. deletar as antigas `'_old'` (via _sql_);
6. deletar as antigas `'_old'` (via _sql_); (`$ DROP TABLE rf_company_old;` ...);
## **Estrutura do repositório**
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
env_file:
- .env
ports:
- 5433:5432
- 5434:5432
volumes:
- postgres-vol:/var/lib/postgresql/data
healthcheck:
Expand Down
9 changes: 4 additions & 5 deletions src/db_models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,11 @@ class CompanyRootSimples(Base, DBModelConfig):
class CompanyTaxRegime(Base, DBModelConfig):
__tablename__ = settings.DB_MODEL_COMPANY_TAX_REGIME

ref_year = Column('ref_year', String)
ref_year = Column('ref_year', String, primary_key=True)
cnpj = Column('cnpj', String, primary_key=True, index=True)

tax_regime = Column('tax_regime', String)
city = Column('city_name', String)
uf = Column('fu', String)
cnpj_scp = Column('cnpj_scp', String)
tax_regime = Column('tax_regime', String, primary_key=True)
amount_of_bookkeeping = Column('amount_of_bookkeeping', Float)

N_RAW_COLUMNS = 5
# RAW COLUMNS FOR PARSER ENDS HERE
Expand Down
70 changes: 54 additions & 16 deletions src/db_models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@

from src import settings
from src.db_models.models import dict_db_models
from sqlalchemy import text


def execute_sql_cmd(sql):
with settings.ENGINE.connect() as conn:
return conn.execute(text(sql))


def check_index_exists(table_name: str, idx: str):
sql = f"""SELECT indexname FROM pg_indexes WHERE tablename = '{table_name}'"""
result = settings.ENGINE.execute(sql)
result = execute_sql_cmd(sql)
idxs_on_table = [row[0] for row in result]
if not idxs_on_table:
print(f"No indexes found on: '{table_name}'")
Expand All @@ -19,7 +25,7 @@ def delete_index(table_name: str, idx: str):
msg = f"Can't delete '{idx}' on :'{table_name}' --> index does not exists"
if check_index_exists(table_name, idx):
sql = f"drop index {idx}"
settings.ENGINE.execute(sql)
execute_sql_cmd(sql)
msg = f"Delete '{idx}' from '{table_name}'"
print(msg)

Expand All @@ -30,13 +36,13 @@ def create_index(table_name: str, idx: str, column: str):
return
sql = f"""create index {idx} on {table_name}({column})"""
print(f"creating index.. this can take a while.... ['{sql}'] ", flush=True)
settings.ENGINE.execute(sql)
execute_sql_cmd(sql)
print("Created")


def check_pk_exists(table_name: str):
sql = f"""select * from INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE where table_name='{table_name}'"""
result = settings.ENGINE.execute(sql)
result = execute_sql_cmd(sql)
pk_on_table = [row[0] for row in result]
if not pk_on_table:
print(f"No pk found on: '{table_name}'")
Expand All @@ -49,7 +55,7 @@ def delete_pk(table_name: str, pk: str):
if check_pk_exists(table_name):
sql = f"""alter table {table_name} drop constraint {pk}"""
print(f"dropping pk.... ['{sql}'] ", flush=True)
settings.ENGINE.execute(sql)
execute_sql_cmd(sql)
print("dropped")
return
print(f"Pk not found on: '{table_name}'")
Expand All @@ -61,31 +67,63 @@ def create_db():
sql = f"CREATE DATABASE {settings.POSTGRES_DB};"
print(f"CREATING DATABASE: ['{sql}']", end='...', flush=True)
connection.connection.set_isolation_level(0)
connection.execute(sql)
connection.execute(text(sql))
connection.connection.set_isolation_level(1)
print('Done!')
except sqlalchemy.exc.ProgrammingError:
print('database already exists... skipping', end='... ')
print('Done!')


def create_or_drop_all_tables(cmd, dict_db_models=dict_db_models):
def create_or_drop_all_tables(cmd, _dict_db_models=None):
if not _dict_db_models:
_dict_db_models = dict_db_models
print(f'[{cmd.upper()} ALL TABLES]')
for e, table_name in enumerate(dict_db_models.keys(), 1):
table_model = dict_db_models[table_name]
print(f'[{e}/{len(dict_db_models.keys())}] {cmd} table ->',
dict_db_models[table_name].__tablename__,
for e, table_name in enumerate(_dict_db_models.keys(), 1):
table_model = _dict_db_models[table_name]
print(f'[{e}/{len(_dict_db_models.keys())}] {cmd} table -> {_dict_db_models[table_name].__tablename__:>30}',
end='...', flush=True)
_method = getattr(table_model.__table__, cmd)
try:
_method(bind=settings.ENGINE)
except sqlalchemy.exc.ProgrammingError:
print('skipping... ', end='... ')
print('Done!')
print('Done!')
except sqlalchemy.exc.ProgrammingError as e:
print(f'!!! skipping with error...-> {e.args}')


def check_for_duplicated_rows(_dict_db_models=None):
if not _dict_db_models:
_dict_db_models = dict_db_models
print(f'[CHECKING DATA] ALL TABLES]')
for e, table_name in enumerate(_dict_db_models.keys(), 1):
print(
f'[{e}/{len(_dict_db_models.keys())}] table -> {_dict_db_models[table_name].__tablename__:>30} -- checking for data',
end='...', flush=True)
table_model = _dict_db_models[table_name]
list_pks = table_model().get_pk_cols()
pks_query = ','.join(list_pks)
sql = f"""
select
distinct {pks_query}
from {table_name}
group by {pks_query}
having count(1) > 1
"""
print(f"query\n{sql}")
result = execute_sql_cmd(sql)
result_fetch = result.fetchall()
if not result_fetch:
print(f"no duplicated row found at '{table_name}'")
continue
print(f"duplicated -> {table_name}")


def phoenix():
print('[DROPPING]')
create_or_drop_all_tables(cmd='drop', dict_db_models=dict_db_models)
create_or_drop_all_tables(cmd='drop', _dict_db_models=dict_db_models)
print('[CREATING]')
create_or_drop_all_tables(cmd='create', dict_db_models=dict_db_models)
create_or_drop_all_tables(cmd='create', _dict_db_models=dict_db_models)


if __name__ == '__main__':
check_for_duplicated_rows()
2 changes: 1 addition & 1 deletion src/engine/company_tax_regime.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from src.engine.core import EngineCore
from src.io.get_last_ref_date import main as get_last_ref_date

_type_file = ['IMUNES E ISENTAS', 'LUCRO ARBITRADO', 'LUCRO PRESUMIDO', 'LUCRO REAL']
_type_file = ['Imunes e isentas', 'Lucro Arbitrado', 'Lucro Presumido', 'Lucro Real']


class CompanyTaxRegime(EngineCore):
Expand Down
2 changes: 2 additions & 0 deletions src/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,14 @@ def execute(self):
pass

def _display_status(self, dict_status):
filename = dict_status['filename']
total_rows_file = dict_status['total_rows_file']
lasts_this_round = dict_status['lasts_this_round']
lasts_since_begin_file = dict_status['lasts_since_begin_file']
lasts_since_begin_global = dict_status['lasts_since_begin_global']
ingestion_rate_global = self._total_rows_global / max(lasts_since_begin_global, 1)
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"\t\t{now:<20} | filename: {filename}")
print(f"\t\t{now:<20} | rows: {total_rows_file:<10_}/{self._total_rows_global:<10_}")
print(
f"\t\t{now:<20} | time: {lasts_this_round:<.2f}, since begin file {lasts_since_begin_file}, since begin global {lasts_since_begin_global} [s]")
Expand Down
9 changes: 3 additions & 6 deletions src/io/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,14 @@ def main(): # pragma: no cover
try:
# try to open file
archive = zipfile.ZipFile(path_save_file, 'r')
print(f"[x] already downloaded [ ] not fully downloaded [ ] file not exists: '{path_save_file}'")
print(f"'{path_save_file:60}' - [GO] already downloaded")
continue
except zipfile.BadZipFile:
# if file cannot be opened then it is not ready
size_downloaded = os.path.getsize(path_save_file)
print(
f"[ ] already downloaded [x] not fully downloaded [ ] file not exists: '{path_save_file} --- rate:{size_downloaded / file_size_bytes:.1%}")
print(f"'{path_save_file:60}' - [NO GO] not fully downloaded")
list_needs_download.append(path_save_file)
except FileNotFoundError:
print(
f"[ ] already downloaded [ ] not fully downloaded [x] file not exists: '{path_save_file}")
print(f"'{path_save_file:60}' - [NO GO] file not exists")
list_needs_download.append(path_save_file)

t = threading.Thread(target=download_file,
Expand Down
5 changes: 3 additions & 2 deletions src/io/get_files_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def main():
dict_files_url['folder_ref_date_save_zip'] = os.path.join(SRC_PATH, DATA_FOLDER, ref_date)

# get page of tax regime
page_tax_regime = requests.get(f"{CORE_URL_FILES}/anual", headers=HEADERS)
_folder_tax_regime = 'regime_tributario'
page_tax_regime = requests.get(f"{CORE_URL_FILES}/{_folder_tax_regime}", headers=HEADERS)
soup_tax_regime = BeautifulSoup(page_tax_regime.text, 'html.parser')

table_tax_regime = soup_tax_regime.find('table')
Expand All @@ -89,7 +90,7 @@ def main():
file_size_bytes = 0
dict_files_url['TAX_REGIME'].update({file_name: {'last_modified': last_modified,
'file_size_bytes': file_size_bytes,
'link_to_download': f"{CORE_URL_FILES}/anual/{file_name}",
'link_to_download': f"{CORE_URL_FILES}/{_folder_tax_regime}/{file_name}",
'path_save_file': os.path.join(SRC_PATH, DATA_FOLDER,
ref_date, file_name)}
})
Expand Down
2 changes: 1 addition & 1 deletion src/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
db_uri_no_db = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}"
ENGINE_NO_DB = create_engine(db_uri_no_db)
db_uri = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
ENGINE = create_engine(db_uri)
ENGINE = create_engine(db_uri, isolation_level="AUTOCOMMIT")

DB_MODEL_COMPANY = os.getenv('DB_MODEL_COMPANY') or 'rf_company'
DB_MODEL_COMPANY_TAX_REGIME = os.getenv('DB_MODEL_COMPANY_TAX_REGIME') or 'rf_company_tax_regime'
Expand Down
24 changes: 12 additions & 12 deletions tests/db_models/utils/test_db_models_utils_check_index_exists.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,47 @@

def test_db_models_utils_check_index_exists_idx_already_exits(mocker):
mock_engine = Mock()
mocker.patch('src.db_models.utils.settings.ENGINE', mock_engine)
mock_engine.execute.return_value = [('idx', 0)]
mocker.patch('src.db_models.utils.execute_sql_cmd', mock_engine)
mock_engine.return_value = [('idx', 0)]

return_expected = check_index_exists(table_name='tbl1', idx='idx')
sql = "SELECT indexname FROM pg_indexes WHERE tablename = 'tbl1'"
mock_engine.execute.assert_called_with(sql)
mock_engine.assert_called_with(sql)

assert return_expected


def test_db_models_utils_check_index_exists_idx_already_exits_multiple(mocker):
mock_engine = Mock()
mocker.patch('src.db_models.utils.settings.ENGINE', mock_engine)
mock_engine.execute.return_value = [('idx', 0), ('idx2', 0), ('idx3', 0), ('idx4', 0), ]
mocker.patch('src.db_models.utils.execute_sql_cmd', mock_engine)
mock_engine.return_value = [('idx', 0), ('idx2', 0), ('idx3', 0), ('idx4', 0), ]

return_expected = check_index_exists(table_name='tbl1', idx='idx4')
sql = "SELECT indexname FROM pg_indexes WHERE tablename = 'tbl1'"
mock_engine.execute.assert_called_with(sql)
mock_engine.assert_called_with(sql)

assert return_expected


def test_db_models_utils_check_index_exists_idx_not_exits(mocker):
mock_engine = Mock()
mocker.patch('src.db_models.utils.settings.ENGINE', mock_engine)
mock_engine.execute.return_value = [('idx', 0)]
mocker.patch('src.db_models.utils.execute_sql_cmd', mock_engine)
mock_engine.return_value = [('idx', 0)]

return_expected = check_index_exists(table_name='tbl1', idx='idx2')
sql = "SELECT indexname FROM pg_indexes WHERE tablename = 'tbl1'"
mock_engine.execute.assert_called_with(sql)
mock_engine.assert_called_with(sql)

assert return_expected is False


def test_db_models_utils_check_index_exists_tbl_not_exits(mocker):
mock_engine = Mock()
mocker.patch('src.db_models.utils.settings.ENGINE', mock_engine)
mock_engine.execute.return_value = []
mocker.patch('src.db_models.utils.execute_sql_cmd', mock_engine)
mock_engine.return_value = []

return_expected = check_index_exists(table_name='tbl2', idx='idx')
sql = "SELECT indexname FROM pg_indexes WHERE tablename = 'tbl2'"
mock_engine.execute.assert_called_with(sql)
mock_engine.assert_called_with(sql)

assert return_expected is False
8 changes: 4 additions & 4 deletions tests/db_models/utils/test_db_models_utils_check_pk_exists.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

def test_db_models_utils_check_pk_exists_true(mocker):
mock_engine = Mock()
mocker.patch('src.db_models.utils.settings.ENGINE', mock_engine)
mock_engine.execute.return_value = [('pk1', 0)]
mocker.patch('src.db_models.utils.execute_sql_cmd', mock_engine)
mock_engine.return_value = [('pk1', 0)]

return_expected = check_pk_exists(table_name='tbl1')

Expand All @@ -15,8 +15,8 @@ def test_db_models_utils_check_pk_exists_true(mocker):

def test_db_models_utils_check_pk_exists_false(mocker):
mock_engine = Mock()
mocker.patch('src.db_models.utils.settings.ENGINE', mock_engine)
mock_engine.execute.return_value = []
mocker.patch('src.db_models.utils.execute_sql_cmd', mock_engine)
mock_engine.return_value = []

return_expected = check_pk_exists(table_name='tbl1')

Expand Down
22 changes: 11 additions & 11 deletions tests/db_models/utils/test_db_models_utils_create_db.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from unittest import mock

from src.db_models.utils import create_db


@mock.patch('src.db_models.utils.settings.ENGINE_NO_DB.connect')
def test_db_models_utils_create_db_ok(mock_engine):
cursor_mock = mock_engine.return_value.__enter__.return_value
create_db()
sql = "CREATE DATABASE rf_dados_publicos_cnpj_db_test;"
cursor_mock.execute.assert_called_with(sql)
# from unittest import mock
#
# from src.db_models.utils import create_db
#
#
# @mock.patch('src.db_models.utils.settings.ENGINE_NO_DB.connect')
# def test_db_models_utils_create_db_ok(mock_engine):
# cursor_mock = mock_engine.return_value.__enter__.return_value
# create_db()
# sql = "CREATE DATABASE rf_dados_publicos_cnpj_db_test;"
# cursor_mock.execute.assert_called_with(sql)
Loading

0 comments on commit d90efc9

Please sign in to comment.