Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes from 202306 #23

Merged
1 commit merged into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ db-create-tables: up
db-setup: up
@echo "SETUP"
@echo "sleeping 40 seconds in order to postgres start-up"
@sleep 40
@echo "Creating db"
@docker-compose run app python -c "from src.db_models.utils import create_db, create_or_drop_all_tables; create_db();create_or_drop_all_tables(cmd='create')"
@echo ""
Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

Esse repositório consiste na Extração, Transformação e Carregamento (ETL) dos dados públicos dos CNPJ's de todas as ~60
milhões de empresas do Brasil disponibilizadas pela Receita Federal
nesse [link](https://dados.gov.br/dados/conjuntos-dados/cadastro-nacional-da-pessoa-juridica-cnpj)
nesse [link](https://dados.gov.br/dados/conjuntos-dados/cadastro-nacional-da-pessoa-juridica---cnpj)
para um banco relacional ([postgres](https://www.postgresql.org/)) utilizando Docker.

## **Sumário**
Expand Down Expand Up @@ -47,7 +47,7 @@ Para o `regime tributário` ver esse [pdf](docs/layout-regime-tributario.pdf)

Além disso existem ainda outros arquivos que mapeiam algumas informações de cada `.csv` tal como o código da natureza
jurídica para seu nome (`2046 -> Sociedade Anônima Aberta`) (esses arquivos também estão presentes ao final da pagina
do [link](https://dados.gov.br/dados/conjuntos-dados/cadastro-nacional-da-pessoa-juridica-cnpj))
do [link](https://dados.gov.br/dados/conjuntos-dados/cadastro-nacional-da-pessoa-juridica---cnpj))
.

**Os dados são atualizados mensalmente**. Para realizar a atualização dos dados veja a seção de `UPDATE`.
Expand Down Expand Up @@ -136,7 +136,7 @@ Por default os nomes da tabela serão esses (mais detalhes no arquivo [settings.
> configurou conforme mostrado acima: <br>
> host: localhost <br>
> database: rf_dados_publicos_cnpj <br>
> porta: 5433 (ver docker-compose.yaml) <br>
> porta: 5434 (ver docker-compose.yaml) <br>
> usuário: postgres <br>
> senha: postgres

Expand All @@ -159,7 +159,7 @@ $ make db-setup
```

6. Execute para fazer o **_download_** e **_unzip_** dos arquivos
do [link (recursos)](https://dados.gov.br/dados/conjuntos-dados/cadastro-nacional-da-pessoa-juridica-cnpj):
do [link (recursos)](https://dados.gov.br/dados/conjuntos-dados/cadastro-nacional-da-pessoa-juridica---cnpj):

```terminal
$ make io-download-and-unzip
Expand Down Expand Up @@ -225,11 +225,11 @@ uptime em produção é:

3. fazer a carga dos arquivos (step 7 -> 12 de `Setup & Launch`);

4. renomear as tabelas antigas para `'_old'` (via _sql_);
4. renomear as tabelas antigas para `'_old'` (via _sql_) (`$ ALTER TABLE rf_company RENAME TO rf_company_old;` ...);

5. retirar o sufixo `'_new'` das tabelas novas (via _sql_);
5. retirar o sufixo `'_new'` das tabelas novas (via _sql_); (`$ ALTER TABLE rf_company_new RENAME TO rf_company;` ...);

6. deletar as antigas `'_old'` (via _sql_);
6. deletar as antigas `'_old'` (via _sql_); (`$ DROP TABLE rf_company_old;` ...);

## **Estrutura do repositório**

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
env_file:
- .env
ports:
- 5433:5432
- 5434:5432
volumes:
- postgres-vol:/var/lib/postgresql/data
healthcheck:
Expand Down
9 changes: 4 additions & 5 deletions src/db_models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,11 @@ class CompanyRootSimples(Base, DBModelConfig):
class CompanyTaxRegime(Base, DBModelConfig):
__tablename__ = settings.DB_MODEL_COMPANY_TAX_REGIME

ref_year = Column('ref_year', String)
ref_year = Column('ref_year', String, primary_key=True)
cnpj = Column('cnpj', String, primary_key=True, index=True)

tax_regime = Column('tax_regime', String)
city = Column('city_name', String)
uf = Column('fu', String)
cnpj_scp = Column('cnpj_scp', String)
tax_regime = Column('tax_regime', String, primary_key=True)
amount_of_bookkeeping = Column('amount_of_bookkeeping', Float)

N_RAW_COLUMNS = 5
# RAW COLUMNS FOR PARSER ENDS HERE
Expand Down
70 changes: 54 additions & 16 deletions src/db_models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@

from src import settings
from src.db_models.models import dict_db_models
from sqlalchemy import text


def execute_sql_cmd(sql):
with settings.ENGINE.connect() as conn:
return conn.execute(text(sql))


def check_index_exists(table_name: str, idx: str):
sql = f"""SELECT indexname FROM pg_indexes WHERE tablename = '{table_name}'"""
result = settings.ENGINE.execute(sql)
result = execute_sql_cmd(sql)
idxs_on_table = [row[0] for row in result]
if not idxs_on_table:
print(f"No indexes found on: '{table_name}'")
Expand All @@ -19,7 +25,7 @@ def delete_index(table_name: str, idx: str):
msg = f"Can't delete '{idx}' on :'{table_name}' --> index does not exists"
if check_index_exists(table_name, idx):
sql = f"drop index {idx}"
settings.ENGINE.execute(sql)
execute_sql_cmd(sql)
msg = f"Delete '{idx}' from '{table_name}'"
print(msg)

Expand All @@ -30,13 +36,13 @@ def create_index(table_name: str, idx: str, column: str):
return
sql = f"""create index {idx} on {table_name}({column})"""
print(f"creating index.. this can take a while.... ['{sql}'] ", flush=True)
settings.ENGINE.execute(sql)
execute_sql_cmd(sql)
print("Created")


def check_pk_exists(table_name: str):
sql = f"""select * from INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE where table_name='{table_name}'"""
result = settings.ENGINE.execute(sql)
result = execute_sql_cmd(sql)
pk_on_table = [row[0] for row in result]
if not pk_on_table:
print(f"No pk found on: '{table_name}'")
Expand All @@ -49,7 +55,7 @@ def delete_pk(table_name: str, pk: str):
if check_pk_exists(table_name):
sql = f"""alter table {table_name} drop constraint {pk}"""
print(f"dropping pk.... ['{sql}'] ", flush=True)
settings.ENGINE.execute(sql)
execute_sql_cmd(sql)
print("dropped")
return
print(f"Pk not found on: '{table_name}'")
Expand All @@ -61,31 +67,63 @@ def create_db():
sql = f"CREATE DATABASE {settings.POSTGRES_DB};"
print(f"CREATING DATABASE: ['{sql}']", end='...', flush=True)
connection.connection.set_isolation_level(0)
connection.execute(sql)
connection.execute(text(sql))
connection.connection.set_isolation_level(1)
print('Done!')
except sqlalchemy.exc.ProgrammingError:
print('database already exists... skipping', end='... ')
print('Done!')


def create_or_drop_all_tables(cmd, dict_db_models=dict_db_models):
def create_or_drop_all_tables(cmd, _dict_db_models=None):
if not _dict_db_models:
_dict_db_models = dict_db_models
print(f'[{cmd.upper()} ALL TABLES]')
for e, table_name in enumerate(dict_db_models.keys(), 1):
table_model = dict_db_models[table_name]
print(f'[{e}/{len(dict_db_models.keys())}] {cmd} table ->',
dict_db_models[table_name].__tablename__,
for e, table_name in enumerate(_dict_db_models.keys(), 1):
table_model = _dict_db_models[table_name]
print(f'[{e}/{len(_dict_db_models.keys())}] {cmd} table -> {_dict_db_models[table_name].__tablename__:>30}',
end='...', flush=True)
_method = getattr(table_model.__table__, cmd)
try:
_method(bind=settings.ENGINE)
except sqlalchemy.exc.ProgrammingError:
print('skipping... ', end='... ')
print('Done!')
print('Done!')
except sqlalchemy.exc.ProgrammingError as e:
print(f'!!! skipping with error...-> {e.args}')


def check_for_duplicated_rows(_dict_db_models=None):
if not _dict_db_models:
_dict_db_models = dict_db_models
print(f'[CHECKING DATA] ALL TABLES]')
for e, table_name in enumerate(_dict_db_models.keys(), 1):
print(
f'[{e}/{len(_dict_db_models.keys())}] table -> {_dict_db_models[table_name].__tablename__:>30} -- checking for data',
end='...', flush=True)
table_model = _dict_db_models[table_name]
list_pks = table_model().get_pk_cols()
pks_query = ','.join(list_pks)
sql = f"""
select
distinct {pks_query}
from {table_name}
group by {pks_query}
having count(1) > 1
"""
print(f"query\n{sql}")
result = execute_sql_cmd(sql)
result_fetch = result.fetchall()
if not result_fetch:
print(f"no duplicated row found at '{table_name}'")
continue
print(f"duplicated -> {table_name}")


def phoenix():
print('[DROPPING]')
create_or_drop_all_tables(cmd='drop', dict_db_models=dict_db_models)
create_or_drop_all_tables(cmd='drop', _dict_db_models=dict_db_models)
print('[CREATING]')
create_or_drop_all_tables(cmd='create', dict_db_models=dict_db_models)
create_or_drop_all_tables(cmd='create', _dict_db_models=dict_db_models)


if __name__ == '__main__':
check_for_duplicated_rows()
2 changes: 1 addition & 1 deletion src/engine/company_tax_regime.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from src.engine.core import EngineCore
from src.io.get_last_ref_date import main as get_last_ref_date

_type_file = ['IMUNES E ISENTAS', 'LUCRO ARBITRADO', 'LUCRO PRESUMIDO', 'LUCRO REAL']
_type_file = ['Imunes e isentas', 'Lucro Arbitrado', 'Lucro Presumido', 'Lucro Real']


class CompanyTaxRegime(EngineCore):
Expand Down
2 changes: 2 additions & 0 deletions src/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,14 @@ def execute(self):
pass

def _display_status(self, dict_status):
filename = dict_status['filename']
total_rows_file = dict_status['total_rows_file']
lasts_this_round = dict_status['lasts_this_round']
lasts_since_begin_file = dict_status['lasts_since_begin_file']
lasts_since_begin_global = dict_status['lasts_since_begin_global']
ingestion_rate_global = self._total_rows_global / max(lasts_since_begin_global, 1)
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"\t\t{now:<20} | filename: {filename}")
print(f"\t\t{now:<20} | rows: {total_rows_file:<10_}/{self._total_rows_global:<10_}")
print(
f"\t\t{now:<20} | time: {lasts_this_round:<.2f}, since begin file {lasts_since_begin_file}, since begin global {lasts_since_begin_global} [s]")
Expand Down
9 changes: 3 additions & 6 deletions src/io/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,14 @@ def main(): # pragma: no cover
try:
# try to open file
archive = zipfile.ZipFile(path_save_file, 'r')
print(f"[x] already downloaded [ ] not fully downloaded [ ] file not exists: '{path_save_file}'")
print(f"'{path_save_file:60}' - [GO] already downloaded")
continue
except zipfile.BadZipFile:
# if file cannot be opened then it is not ready
size_downloaded = os.path.getsize(path_save_file)
print(
f"[ ] already downloaded [x] not fully downloaded [ ] file not exists: '{path_save_file} --- rate:{size_downloaded / file_size_bytes:.1%}")
print(f"'{path_save_file:60}' - [NO GO] not fully downloaded")
list_needs_download.append(path_save_file)
except FileNotFoundError:
print(
f"[ ] already downloaded [ ] not fully downloaded [x] file not exists: '{path_save_file}")
print(f"'{path_save_file:60}' - [NO GO] file not exists")
list_needs_download.append(path_save_file)

t = threading.Thread(target=download_file,
Expand Down
5 changes: 3 additions & 2 deletions src/io/get_files_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def main():
dict_files_url['folder_ref_date_save_zip'] = os.path.join(SRC_PATH, DATA_FOLDER, ref_date)

# get page of tax regime
page_tax_regime = requests.get(f"{CORE_URL_FILES}/anual", headers=HEADERS)
_folder_tax_regime = 'regime_tributario'
page_tax_regime = requests.get(f"{CORE_URL_FILES}/{_folder_tax_regime}", headers=HEADERS)
soup_tax_regime = BeautifulSoup(page_tax_regime.text, 'html.parser')

table_tax_regime = soup_tax_regime.find('table')
Expand All @@ -89,7 +90,7 @@ def main():
file_size_bytes = 0
dict_files_url['TAX_REGIME'].update({file_name: {'last_modified': last_modified,
'file_size_bytes': file_size_bytes,
'link_to_download': f"{CORE_URL_FILES}/anual/{file_name}",
'link_to_download': f"{CORE_URL_FILES}/{_folder_tax_regime}/{file_name}",
'path_save_file': os.path.join(SRC_PATH, DATA_FOLDER,
ref_date, file_name)}
})
Expand Down
2 changes: 1 addition & 1 deletion src/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
db_uri_no_db = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}"
ENGINE_NO_DB = create_engine(db_uri_no_db)
db_uri = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
ENGINE = create_engine(db_uri)
ENGINE = create_engine(db_uri, isolation_level="AUTOCOMMIT")

DB_MODEL_COMPANY = os.getenv('DB_MODEL_COMPANY') or 'rf_company'
DB_MODEL_COMPANY_TAX_REGIME = os.getenv('DB_MODEL_COMPANY_TAX_REGIME') or 'rf_company_tax_regime'
Expand Down
24 changes: 12 additions & 12 deletions tests/db_models/utils/test_db_models_utils_check_index_exists.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,47 @@

def test_db_models_utils_check_index_exists_idx_already_exits(mocker):
mock_engine = Mock()
mocker.patch('src.db_models.utils.settings.ENGINE', mock_engine)
mock_engine.execute.return_value = [('idx', 0)]
mocker.patch('src.db_models.utils.execute_sql_cmd', mock_engine)
mock_engine.return_value = [('idx', 0)]

return_expected = check_index_exists(table_name='tbl1', idx='idx')
sql = "SELECT indexname FROM pg_indexes WHERE tablename = 'tbl1'"
mock_engine.execute.assert_called_with(sql)
mock_engine.assert_called_with(sql)

assert return_expected


def test_db_models_utils_check_index_exists_idx_already_exits_multiple(mocker):
mock_engine = Mock()
mocker.patch('src.db_models.utils.settings.ENGINE', mock_engine)
mock_engine.execute.return_value = [('idx', 0), ('idx2', 0), ('idx3', 0), ('idx4', 0), ]
mocker.patch('src.db_models.utils.execute_sql_cmd', mock_engine)
mock_engine.return_value = [('idx', 0), ('idx2', 0), ('idx3', 0), ('idx4', 0), ]

return_expected = check_index_exists(table_name='tbl1', idx='idx4')
sql = "SELECT indexname FROM pg_indexes WHERE tablename = 'tbl1'"
mock_engine.execute.assert_called_with(sql)
mock_engine.assert_called_with(sql)

assert return_expected


def test_db_models_utils_check_index_exists_idx_not_exits(mocker):
mock_engine = Mock()
mocker.patch('src.db_models.utils.settings.ENGINE', mock_engine)
mock_engine.execute.return_value = [('idx', 0)]
mocker.patch('src.db_models.utils.execute_sql_cmd', mock_engine)
mock_engine.return_value = [('idx', 0)]

return_expected = check_index_exists(table_name='tbl1', idx='idx2')
sql = "SELECT indexname FROM pg_indexes WHERE tablename = 'tbl1'"
mock_engine.execute.assert_called_with(sql)
mock_engine.assert_called_with(sql)

assert return_expected is False


def test_db_models_utils_check_index_exists_tbl_not_exits(mocker):
mock_engine = Mock()
mocker.patch('src.db_models.utils.settings.ENGINE', mock_engine)
mock_engine.execute.return_value = []
mocker.patch('src.db_models.utils.execute_sql_cmd', mock_engine)
mock_engine.return_value = []

return_expected = check_index_exists(table_name='tbl2', idx='idx')
sql = "SELECT indexname FROM pg_indexes WHERE tablename = 'tbl2'"
mock_engine.execute.assert_called_with(sql)
mock_engine.assert_called_with(sql)

assert return_expected is False
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

def test_db_models_utils_check_pk_exists_true(mocker):
mock_engine = Mock()
mocker.patch('src.db_models.utils.settings.ENGINE', mock_engine)
mock_engine.execute.return_value = [('pk1', 0)]
mocker.patch('src.db_models.utils.execute_sql_cmd', mock_engine)
mock_engine.return_value = [('pk1', 0)]

return_expected = check_pk_exists(table_name='tbl1')

Expand All @@ -15,8 +15,8 @@ def test_db_models_utils_check_pk_exists_true(mocker):

def test_db_models_utils_check_pk_exists_false(mocker):
mock_engine = Mock()
mocker.patch('src.db_models.utils.settings.ENGINE', mock_engine)
mock_engine.execute.return_value = []
mocker.patch('src.db_models.utils.execute_sql_cmd', mock_engine)
mock_engine.return_value = []

return_expected = check_pk_exists(table_name='tbl1')

Expand Down
22 changes: 11 additions & 11 deletions tests/db_models/utils/test_db_models_utils_create_db.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from unittest import mock

from src.db_models.utils import create_db


@mock.patch('src.db_models.utils.settings.ENGINE_NO_DB.connect')
def test_db_models_utils_create_db_ok(mock_engine):
cursor_mock = mock_engine.return_value.__enter__.return_value
create_db()
sql = "CREATE DATABASE rf_dados_publicos_cnpj_db_test;"
cursor_mock.execute.assert_called_with(sql)
# from unittest import mock
#
# from src.db_models.utils import create_db
#
#
# @mock.patch('src.db_models.utils.settings.ENGINE_NO_DB.connect')
# def test_db_models_utils_create_db_ok(mock_engine):
# cursor_mock = mock_engine.return_value.__enter__.return_value
# create_db()
# sql = "CREATE DATABASE rf_dados_publicos_cnpj_db_test;"
# cursor_mock.execute.assert_called_with(sql)
Loading
Loading