From 6bc037f738e3c8d8adbebbb13eafbbda548ec1e8 Mon Sep 17 00:00:00 2001 From: wildemberg-sales Date: Tue, 21 May 2024 10:35:08 -0300 Subject: [PATCH 1/8] Adicionando variaveis do banco no env e criando modulo de agregados > > > Co-authored-by: pkbceira03 > Co-authored-by: csafurtado --- aggregates/__init__.py | 6 +++ aggregates/aggregates_access.py | 96 +++++++++++++++++++++++++++++++++ config/config.py | 5 ++ config/sample.env | 5 ++ 4 files changed, 112 insertions(+) create mode 100644 aggregates/__init__.py create mode 100644 aggregates/aggregates_access.py diff --git a/aggregates/__init__.py b/aggregates/__init__.py new file mode 100644 index 0000000..da23c65 --- /dev/null +++ b/aggregates/__init__.py @@ -0,0 +1,6 @@ +from .aggregates_access import ( + AggregatesAccess, + AggregatesAccessInterface, + AggregatesDatabaseInterface, + create_aggregates_interface, +) \ No newline at end of file diff --git a/aggregates/aggregates_access.py b/aggregates/aggregates_access.py new file mode 100644 index 0000000..186bcec --- /dev/null +++ b/aggregates/aggregates_access.py @@ -0,0 +1,96 @@ +import abc + +class AggregatesDatabaseInterface(abc.ABC): + """ + Interface to access data from aggregates. + """ + + @abc.abstractmethod + def get_aggregate(self, territory_id: str = "", year: str = ""): + """ + Get information about a aggregate. + """ + + @abc.abstractmethod + def get_aggregates_from_state_and_year(self, state_code: str = "", year: str = ""): + """ + Get information about aggregates from state and year. + """ + + @abc.abstractmethod + def get_aggregates_from_territory(self, territory_id: str = "", start_year: str = "", end_year: str = ""): + """ + Get information about all aggregates from territory, within an interval of year. + """ + +class AggregatesAccessInterface(abc.ABC): + """ + Interface to access data from aggregates. + """ + + @abc.abstractmethod + def get_aggregate(self, territory_id: str = "", year: str = ""): + """ + Get information about a aggregate. + """ + + @abc.abstractmethod + def get_aggregates_from_state_and_year(self, state_code: str = "", year: str = ""): + """ + Get information about aggregates from state and year. + """ + + @abc.abstractmethod + def get_aggregates_from_territory(self, territory_id: str = "", start_year: str = "", end_year: str = ""): + """ + Get information about all aggregates from territory, within an interval of year. + """ + +class AggregatesAccess(AggregatesAccessInterface): + _database_gateway = None + + def __init__(self, database_gateway=None): + self._database_gateway = database_gateway + + def get_aggregate(self, territory_id: str = "", year: str = ""): + aggregate_info = self._database_gateway.get_aggregate(territory_id, year) + return aggregate_info + + def get_aggregates_from_state_and_year(self, state_code: str = "", year: str = ""): + aggregates_info = self._database_gateway.get_aggregates_from_state_and_year(state_code, year) + return aggregates_info + + def get_aggregates_from_territory(self, territory_id: str = "", start_year: str = "", end_year: str = ""): + aggregates_info = self._database_gateway.get_aggregates_from_territory(territory_id, start_year, end_year) + return aggregates_info + +class Aggregates: + """ + Item to represente a aggregate in memory inside the module + """ + + def __init__( + self, + territory_id, + url_zip, + year, + last_updated, + hash_info, + file_size + ): + self.territory_id = territory_id + self.url_zip = url_zip + self.year = year + self.last_updated = last_updated + self.hash_info = hash_info + self.file_size = file_size + + def __repr__(self): + return f"Aggregates(territory_id={self.territory_id}, url_zip={self.url_zip}, year={self.year}, last_updated={self.last_updated}, hash_info={self.hash_info}, file_size={self.file_size})" + +def create_aggregates_interface(database_gateway: AggregatesDatabaseInterface) -> AggregatesAccessInterface: + if not isinstance(database_gateway, AggregatesDatabaseInterface): + raise Exception( + "Database gateway should implement the AggregatesDatabaseIntefaze interface" + ) + return AggregatesAccess(database_gateway) \ No newline at end of file diff --git a/config/config.py b/config/config.py index 1f796cd..3afc9e5 100644 --- a/config/config.py +++ b/config/config.py @@ -98,6 +98,11 @@ def __init__(self): self.companies_database_port = os.environ.get("POSTGRES_PORT", "") self.opensearch_user = os.environ.get("QUERIDO_DIARIO_OPENSEARCH_USER", "") self.opensearch_pswd = os.environ.get("QUERIDO_DIARIO_OPENSEARCH_PASSWORD", "") + self.aggregates_database_host = os.environ.get("POSTGRES_AGGREGATES_HOST", "") + self.aggregates_database_db = os.environ.get("POSTGRES_AGGREGATES_DB", "") + self.aggregates_database_user = os.environ.get("POSTGRES_AGGREGATES_USER", "") + self.aggregates_database_pass = os.environ.get("POSTGRES_AGGREGATES_PASSWORD", "") + self.aggregates_database_port = os.environ.get("POSTGRES_AGGREGATES_PORT", "") @classmethod def _load_list(cls, key, default=[]): value = os.environ.get(key, default) diff --git a/config/sample.env b/config/sample.env index 8b4a0e2..6b16fae 100644 --- a/config/sample.env +++ b/config/sample.env @@ -13,6 +13,11 @@ POSTGRES_PASSWORD=companies POSTGRES_DB=companiesdb POSTGRES_HOST=localhost POSTGRES_PORT=5432 +POSTGRES_AGGREGATES_USER=queridodiario +POSTGRES_AGGREGATES_PASSWORD=queridodiario +POSTGRES_AGGREGATES_DB=queridodiariodb +POSTGRES_AGGREGATES_HOST=localhost +POSTGRES_AGGREGATES_PORT=5432 CITY_DATABASE_CSV=censo.csv GAZETTE_OPENSEARCH_INDEX=querido-diario GAZETTE_CONTENT_FIELD=source_text From fc39df7cc50278232e95b4ed391866830b3e366c Mon Sep 17 00:00:00 2001 From: pkbceira03 Date: Tue, 21 May 2024 16:23:23 -0300 Subject: [PATCH 2/8] =?UTF-8?q?Adicionando=20fun=C3=A7=C3=B5es=20de=20busc?= =?UTF-8?q?a=20de=20agregados=20e=20rotas=20da=20postgresql?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: wildemberg-sales --- aggregates/__init__.py | 1 + aggregates/aggregates_access.py | 56 +++++++---------------------- api/api.py | 2 ++ database/__init__.py | 9 ++++- database/postgresql.py | 63 +++++++++++++++++++++++++++++++++ main/__main__.py | 15 ++++++-- 6 files changed, 99 insertions(+), 47 deletions(-) diff --git a/aggregates/__init__.py b/aggregates/__init__.py index da23c65..88e5ff9 100644 --- a/aggregates/__init__.py +++ b/aggregates/__init__.py @@ -3,4 +3,5 @@ AggregatesAccessInterface, AggregatesDatabaseInterface, create_aggregates_interface, + Aggregates ) \ No newline at end of file diff --git a/aggregates/aggregates_access.py b/aggregates/aggregates_access.py index 186bcec..6892a2a 100644 --- a/aggregates/aggregates_access.py +++ b/aggregates/aggregates_access.py @@ -6,77 +6,45 @@ class AggregatesDatabaseInterface(abc.ABC): """ @abc.abstractmethod - def get_aggregate(self, territory_id: str = "", year: str = ""): + def get_aggregates(self, territory_id: str = "", state_code: str = ""): """ Get information about a aggregate. """ - @abc.abstractmethod - def get_aggregates_from_state_and_year(self, state_code: str = "", year: str = ""): - """ - Get information about aggregates from state and year. - """ - - @abc.abstractmethod - def get_aggregates_from_territory(self, territory_id: str = "", start_year: str = "", end_year: str = ""): - """ - Get information about all aggregates from territory, within an interval of year. - """ - class AggregatesAccessInterface(abc.ABC): """ Interface to access data from aggregates. """ @abc.abstractmethod - def get_aggregate(self, territory_id: str = "", year: str = ""): + def get_aggregates(self, territory_id: str = "", state_code: str = ""): """ Get information about a aggregate. """ - @abc.abstractmethod - def get_aggregates_from_state_and_year(self, state_code: str = "", year: str = ""): - """ - Get information about aggregates from state and year. - """ - - @abc.abstractmethod - def get_aggregates_from_territory(self, territory_id: str = "", start_year: str = "", end_year: str = ""): - """ - Get information about all aggregates from territory, within an interval of year. - """ - class AggregatesAccess(AggregatesAccessInterface): _database_gateway = None def __init__(self, database_gateway=None): self._database_gateway = database_gateway - def get_aggregate(self, territory_id: str = "", year: str = ""): - aggregate_info = self._database_gateway.get_aggregate(territory_id, year) + def get_aggregates(self, territory_id: str = "", state_code: str = ""): + aggregate_info = self._database_gateway.get_aggregates(territory_id, state_code) return aggregate_info - def get_aggregates_from_state_and_year(self, state_code: str = "", year: str = ""): - aggregates_info = self._database_gateway.get_aggregates_from_state_and_year(state_code, year) - return aggregates_info - - def get_aggregates_from_territory(self, territory_id: str = "", start_year: str = "", end_year: str = ""): - aggregates_info = self._database_gateway.get_aggregates_from_territory(territory_id, start_year, end_year) - return aggregates_info - class Aggregates: """ Item to represente a aggregate in memory inside the module """ def __init__( - self, - territory_id, - url_zip, - year, - last_updated, - hash_info, - file_size + self, + territory_id, + url_zip, + year, + last_updated, + hash_info, + file_size, ): self.territory_id = territory_id self.url_zip = url_zip @@ -91,6 +59,6 @@ def __repr__(self): def create_aggregates_interface(database_gateway: AggregatesDatabaseInterface) -> AggregatesAccessInterface: if not isinstance(database_gateway, AggregatesDatabaseInterface): raise Exception( - "Database gateway should implement the AggregatesDatabaseIntefaze interface" + "Database gateway should implement the AggregatesDatabaseInterface interface" ) return AggregatesAccess(database_gateway) \ No newline at end of file diff --git a/api/api.py b/api/api.py index 1bad5f8..aa41e36 100644 --- a/api/api.py +++ b/api/api.py @@ -14,6 +14,7 @@ from config.config import load_configuration from themed_excerpts import ThemedExcerptAccessInterface, ThemedExcerptAccessInterface from themed_excerpts.themed_excerpt_access import ThemedExcerptRequest +from aggregates import AggregatesAccessInterface config = load_configuration() @@ -561,6 +562,7 @@ def configure_api_app( cities: CityAccessInterface, suggestion_service: SuggestionServiceInterface, companies: CompaniesAccessInterface, + aggregates: AggregatesAccessInterface, api_root_path=None, ): if not isinstance(gazettes, GazetteAccessInterface): diff --git a/database/__init__.py b/database/__init__.py index 0d0e2de..e8d15ee 100644 --- a/database/__init__.py +++ b/database/__init__.py @@ -1,9 +1,16 @@ from companies import CompaniesDatabaseInterface +from aggregates import AggregatesDatabaseInterface -from .postgresql import PostgreSQLDatabase +from .postgresql import PostgreSQLDatabase, PostgreSQLDatabaseAggregates def create_companies_database_interface( db_host, db_name, db_user, db_pass, db_port ) -> CompaniesDatabaseInterface: return PostgreSQLDatabase(db_host, db_name, db_user, db_pass, db_port) + + +def create_aggregates_database_interface( + db_host, db_name, db_user, db_pass, db_port +) -> AggregatesDatabaseInterface: + return PostgreSQLDatabaseAggregates(db_host, db_name, db_user, db_pass, db_port) diff --git a/database/postgresql.py b/database/postgresql.py index e780c43..95602fd 100644 --- a/database/postgresql.py +++ b/database/postgresql.py @@ -5,6 +5,7 @@ import psycopg2 from companies import Company, InvalidCNPJException, Partner, CompaniesDatabaseInterface +from aggregates import AggregatesDatabaseInterface, Aggregates class PostgreSQLDatabase(CompaniesDatabaseInterface): @@ -224,3 +225,65 @@ def _unsplit_cnpj(self, cnpj_basico: str, cnpj_ordem: str, cnpj_dv: str) -> str: cnpj_ordem=str(cnpj_ordem).zfill(4), cnpj_dv=str(cnpj_dv).zfill(2), ) + +class PostgreSQLDatabaseAggregates(AggregatesDatabaseInterface): + def __init__(self, host, database, user, password, port): + self.host = host + self.database = database + self.user = user + self.password = password + self.port = port + + def _select(self, command: str, data: Dict = {}) -> Iterable[Tuple]: + connection = psycopg2.connect( + dbname=self.database, + user=self.user, + password=self.password, + host=self.host, + port=self.port, + ) + with connection.cursor() as cursor: + cursor.execute(command, data) + logging.debug(f"Starting query: {cursor.query}") + for entry in cursor: + logging.debug(entry) + yield entry + logging.debug(f"Finished query: {cursor.query}") + + def get_aggregates(self, territory_id: str = "", state_code: str = "") -> Union[Aggregates, None]: + if(state_code == ""): + command = """ + SELECT + * + FROM + aggregates + INNERJOIN territories + ON aggregates.territory_id = territories.id + WHERE + territory_id = %(territory_id)s + """ + data = { + "territory_id": territory_id + } + else: + command = """ + SELECT + * + FROM + aggregates + INNERJOIN territories + ON aggregates.territory_id = territories.id + WHERE + territory_id = %(territory_id)s AND state_code = %(state_code)s + """ + data = { + "territory_id": territory_id, + "state_code": state_code + } + + print(result) + result = list(self._select(command, data)) + if result == []: + return None + return result + diff --git a/main/__main__.py b/main/__main__.py index f1dcd92..e4e3b72 100644 --- a/main/__main__.py +++ b/main/__main__.py @@ -4,7 +4,8 @@ from cities import create_cities_data_gateway, create_cities_interface from config import load_configuration from companies import create_companies_interface -from database import create_companies_database_interface +from aggregates import create_aggregates_interface +from database import create_companies_database_interface, create_aggregates_database_interface from gazettes import ( create_gazettes_interface, create_gazettes_data_gateway, @@ -80,13 +81,23 @@ db_port=configuration.companies_database_port, ) companies_interface = create_companies_interface(companies_database) +aggregates_database = create_aggregates_database_interface( + db_host=configuration.aggregates_database_host, + db_name=configuration.aggregates_database_db, + db_user=configuration.aggregates_database_user, + db_pass=configuration.aggregates_database_pass, + db_port=configuration.aggregates_database_port, +) +aggregates_interface = create_aggregates_interface(aggregates_database) + configure_api_app( gazettes_interface, themed_excerpts_interface, cities_interface, suggestion_service, companies_interface, - configuration.root_path, + aggregates_interface, + configuration.root_path ) uvicorn.run(app, host="0.0.0.0", port=8080, root_path=configuration.root_path) From ee79446a3f2453ddc74e94dcfe9de1e2a1d4f2fa Mon Sep 17 00:00:00 2001 From: wildemberg-sales Date: Tue, 21 May 2024 17:52:06 -0300 Subject: [PATCH 3/8] implementando rota para busca dos arquivos agregados Co-authored-by: pkbceira03 --- api/api.py | 31 +++++++++++++++++++++++++++++++ database/postgresql.py | 42 ++++++++++++++++++++++++++++++++---------- 2 files changed, 63 insertions(+), 10 deletions(-) diff --git a/api/api.py b/api/api.py index aa41e36..d4406c9 100644 --- a/api/api.py +++ b/api/api.py @@ -88,6 +88,17 @@ class Entity(BaseModel): class EntitiesSearchResponse(BaseModel): entities: List[Entity] +class Aggregates(BaseModel): + territory_id: str + state_code: str + url_zip: str + year: str + last_updated: datetime + hash_info: str + file_size: str + +class AggregatesSearchResponse(BaseModel): + aggregates: List[Aggregates] @unique class CityLevel(str, Enum): @@ -555,6 +566,21 @@ async def get_partners( return {"total_partners": total_partners, "partners": partners} +@app.get( + "/aggregate/{state_code}", + name="Get aggregates by state code and territory ID", + response_model=AggregatesSearchResponse, + description="Get information about a aggregate by state code and territory ID.", + responses={ + 400: {"model": HTTPExceptionMessage, "description": "City not found."}, + },) +async def get_aggregates(territory_id: str = Query("", description="City's 7-digit IBGE ID."), + state_code: str = Path(..., description="City's state code.")): + try: + aggregates = app.aggregates.get_aggregates(territory_id, state_code) + return JSONResponse(status_code=200, content=aggregates) + except Exception as exc: + return JSONResponse(status_code=404, content={"detail": str(exc)}) def configure_api_app( gazettes: GazetteAccessInterface, @@ -585,6 +611,10 @@ def configure_api_app( raise Exception( "Only CompaniesAccessInterface object are accepted for companies parameter" ) + if not isinstance(aggregates, AggregatesAccessInterface): + raise Exception( + "Only AggregatesAccessInterface object are accepted for aggregates parameter" + ) if api_root_path is not None and type(api_root_path) != str: raise Exception("Invalid api_root_path") app.gazettes = gazettes @@ -592,4 +622,5 @@ def configure_api_app( app.cities = cities app.suggestion_service = suggestion_service app.companies = companies + app.aggregates = aggregates app.root_path = api_root_path diff --git a/database/postgresql.py b/database/postgresql.py index 95602fd..5b1af4b 100644 --- a/database/postgresql.py +++ b/database/postgresql.py @@ -249,21 +249,40 @@ def _select(self, command: str, data: Dict = {}) -> Iterable[Tuple]: logging.debug(entry) yield entry logging.debug(f"Finished query: {cursor.query}") + + def _always_str_or_none(self, data: Any) -> Union[str, None]: + if data == "None" or data == "" or data is None: + return None + elif not isinstance(data, str): + return str(data) + else: + return data + + def _format_aggregates_data(self, data: Tuple) -> Dict: + formatted_data = [self._always_str_or_none(value) for value in data] + return { + "territory_id": formatted_data[1], + "url_zip": formatted_data[2], + "year": formatted_data[3], + "last_updated": formatted_data[4], + "hash_info": formatted_data[5], + "file_size": formatted_data[6] + } def get_aggregates(self, territory_id: str = "", state_code: str = "") -> Union[Aggregates, None]: - if(state_code == ""): + if(territory_id == ""): command = """ SELECT * FROM aggregates - INNERJOIN territories + INNER JOIN territories ON aggregates.territory_id = territories.id WHERE - territory_id = %(territory_id)s + state_code = %(state_code)s """ data = { - "territory_id": territory_id + "state_code": state_code } else: command = """ @@ -271,7 +290,7 @@ def get_aggregates(self, territory_id: str = "", state_code: str = "") -> Union[ * FROM aggregates - INNERJOIN territories + INNER JOIN territories ON aggregates.territory_id = territories.id WHERE territory_id = %(territory_id)s AND state_code = %(state_code)s @@ -281,9 +300,12 @@ def get_aggregates(self, territory_id: str = "", state_code: str = "") -> Union[ "state_code": state_code } - print(result) - result = list(self._select(command, data)) - if result == []: - return None - return result + results = list(self._select(command, data)) + if not results: + return [] + + return ( + [self._format_aggregates_data(result) for result in results] + ) + \ No newline at end of file From 72823f8259b4c879d5af5814288301375ae26c9f Mon Sep 17 00:00:00 2001 From: pkbceira03 Date: Thu, 11 Jul 2024 13:40:19 -0300 Subject: [PATCH 4/8] Adiciona state_code e territory_id Co-authored: ArthurFerreiraRodrigues --- api/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/api.py b/api/api.py index d4406c9..2986391 100644 --- a/api/api.py +++ b/api/api.py @@ -578,7 +578,7 @@ async def get_aggregates(territory_id: str = Query("", description="City's 7-dig state_code: str = Path(..., description="City's state code.")): try: aggregates = app.aggregates.get_aggregates(territory_id, state_code) - return JSONResponse(status_code=200, content=aggregates) + return JSONResponse(status_code=200, content={"state_code":state_code,"territory_id":territory_id,"aggregates":aggregates}) except Exception as exc: return JSONResponse(status_code=404, content={"detail": str(exc)}) From bda6a04ab2c5da1618204ff137726dd9ab3101f2 Mon Sep 17 00:00:00 2001 From: wildemberg-sales Date: Wed, 17 Jul 2024 10:50:03 -0300 Subject: [PATCH 5/8] Implementa filtragem de estado e territorio de agregados --- aggregates/aggregates_access.py | 11 +++++++---- api/api.py | 11 ++++++++--- database/postgresql.py | 27 ++++++++++++++------------- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/aggregates/aggregates_access.py b/aggregates/aggregates_access.py index 6892a2a..50686da 100644 --- a/aggregates/aggregates_access.py +++ b/aggregates/aggregates_access.py @@ -1,4 +1,5 @@ import abc +from typing import Optional class AggregatesDatabaseInterface(abc.ABC): """ @@ -6,7 +7,7 @@ class AggregatesDatabaseInterface(abc.ABC): """ @abc.abstractmethod - def get_aggregates(self, territory_id: str = "", state_code: str = ""): + def get_aggregates(self, territory_id: Optional[str] = None, state_code: str = ""): """ Get information about a aggregate. """ @@ -17,7 +18,7 @@ class AggregatesAccessInterface(abc.ABC): """ @abc.abstractmethod - def get_aggregates(self, territory_id: str = "", state_code: str = ""): + def get_aggregates(self, territory_id: Optional[str] = None, state_code: str = ""): """ Get information about a aggregate. """ @@ -28,7 +29,7 @@ class AggregatesAccess(AggregatesAccessInterface): def __init__(self, database_gateway=None): self._database_gateway = database_gateway - def get_aggregates(self, territory_id: str = "", state_code: str = ""): + def get_aggregates(self, territory_id: Optional[str] = None, state_code: str = ""): aggregate_info = self._database_gateway.get_aggregates(territory_id, state_code) return aggregate_info @@ -40,6 +41,7 @@ class Aggregates: def __init__( self, territory_id, + state_code, url_zip, year, last_updated, @@ -47,6 +49,7 @@ def __init__( file_size, ): self.territory_id = territory_id + self.state_code = state_code self.url_zip = url_zip self.year = year self.last_updated = last_updated @@ -54,7 +57,7 @@ def __init__( self.file_size = file_size def __repr__(self): - return f"Aggregates(territory_id={self.territory_id}, url_zip={self.url_zip}, year={self.year}, last_updated={self.last_updated}, hash_info={self.hash_info}, file_size={self.file_size})" + return f"Aggregates(territory_id={self.territory_id}, state_code={self.state_code}, url_zip={self.url_zip}, year={self.year}, last_updated={self.last_updated}, hash_info={self.hash_info}, file_size={self.file_size})" def create_aggregates_interface(database_gateway: AggregatesDatabaseInterface) -> AggregatesAccessInterface: if not isinstance(database_gateway, AggregatesDatabaseInterface): diff --git a/api/api.py b/api/api.py index 2986391..245e95c 100644 --- a/api/api.py +++ b/api/api.py @@ -574,11 +574,16 @@ async def get_partners( responses={ 400: {"model": HTTPExceptionMessage, "description": "City not found."}, },) -async def get_aggregates(territory_id: str = Query("", description="City's 7-digit IBGE ID."), +async def get_aggregates(territory_id: Optional[str] = Query(None, description="City's 7-digit IBGE ID."), state_code: str = Path(..., description="City's state code.")): try: - aggregates = app.aggregates.get_aggregates(territory_id, state_code) - return JSONResponse(status_code=200, content={"state_code":state_code,"territory_id":territory_id,"aggregates":aggregates}) + aggregates = app.aggregates.get_aggregates(territory_id, state_code.upper()) + return JSONResponse(status_code=200, + content={ + "state_code":state_code.upper(), + "territory_id":territory_id, + "aggregates":aggregates} + ) except Exception as exc: return JSONResponse(status_code=404, content={"detail": str(exc)}) diff --git a/database/postgresql.py b/database/postgresql.py index 5b1af4b..4d0871f 100644 --- a/database/postgresql.py +++ b/database/postgresql.py @@ -1,6 +1,6 @@ import logging import re -from typing import Any, Dict, Iterable, List, Tuple, Union +from typing import Any, Dict, Iterable, List, Tuple, Union, Optional import psycopg2 @@ -262,24 +262,25 @@ def _format_aggregates_data(self, data: Tuple) -> Dict: formatted_data = [self._always_str_or_none(value) for value in data] return { "territory_id": formatted_data[1], - "url_zip": formatted_data[2], - "year": formatted_data[3], - "last_updated": formatted_data[4], - "hash_info": formatted_data[5], - "file_size": formatted_data[6] + "state_code": formatted_data[2], + "url_zip": formatted_data[3], + "year": formatted_data[4], + "last_updated": formatted_data[5], + "hash_info": formatted_data[6], + "file_size": formatted_data[7] } - def get_aggregates(self, territory_id: str = "", state_code: str = "") -> Union[Aggregates, None]: - if(territory_id == ""): + def get_aggregates(self, territory_id: Optional[str] = None, state_code: str = "") -> Union[List[Aggregates], None]: + if territory_id is None: command = """ SELECT * FROM aggregates - INNER JOIN territories - ON aggregates.territory_id = territories.id WHERE state_code = %(state_code)s + AND + territory_id IS NULL """ data = { "state_code": state_code @@ -290,10 +291,10 @@ def get_aggregates(self, territory_id: str = "", state_code: str = "") -> Union[ * FROM aggregates - INNER JOIN territories - ON aggregates.territory_id = territories.id WHERE - territory_id = %(territory_id)s AND state_code = %(state_code)s + territory_id = %(territory_id)s + AND + state_code = %(state_code)s """ data = { "territory_id": territory_id, From e31f17fab85897bfb520d2b8de340afb32720e87 Mon Sep 17 00:00:00 2001 From: wildemberg-sales Date: Mon, 5 Aug 2024 16:19:48 -0300 Subject: [PATCH 6/8] Especifica variaveis de Companies --- Makefile | 24 ++++++++++++------------ config/config.py | 10 +++++----- config/sample.env | 10 +++++----- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/Makefile b/Makefile index 02891ef..c783ea6 100644 --- a/Makefile +++ b/Makefile @@ -13,12 +13,12 @@ POD_NAME ?= querido-diario DATABASE_CONTAINER_NAME ?= $(POD_NAME)-db OPENSEARCH_CONTAINER_NAME ?= $(POD_NAME)-opensearch # Database info user to run the tests -POSTGRES_USER ?= companies -POSTGRES_PASSWORD ?= companies -POSTGRES_DB ?= companiesdb -POSTGRES_HOST ?= localhost -POSTGRES_PORT ?= 5432 -POSTGRES_IMAGE ?= docker.io/postgres:10 +POSTGRES_COMPANIES_USER ?= companies +POSTGRES_COMPANIES_PASSWORD ?= companies +POSTGRES_COMPANIES_DB ?= companiesdb +POSTGRES_COMPANIES_HOST ?= localhost +POSTGRES_COMPANIES_PORT ?= 5432 +POSTGRES_COMPANIES_IMAGE ?= docker.io/postgres:10 DATABASE_RESTORE_FILE ?= contrib/data/queridodiariodb.tar # Run integration tests. Run local opensearch to validate the iteration RUN_INTEGRATION_TESTS ?= 0 @@ -73,7 +73,7 @@ destroy-pod: create-pod: setup-environment destroy-pod podman pod create --publish $(API_PORT):$(API_PORT) \ - --publish $(POSTGRES_PORT):$(POSTGRES_PORT) \ + --publish $(POSTGRES_COMPANIES_PORT):$(POSTGRES_COMPANIES_PORT) \ --publish $(OPENSEARCH_PORT1):$(OPENSEARCH_PORT1) \ --publish $(OPENSEARCH_PORT2):$(OPENSEARCH_PORT2) \ --name $(POD_NAME) @@ -166,10 +166,10 @@ start-database: podman run -d --rm -ti \ --name $(DATABASE_CONTAINER_NAME) \ --pod $(POD_NAME) \ - -e POSTGRES_PASSWORD=$(POSTGRES_PASSWORD) \ - -e POSTGRES_USER=$(POSTGRES_USER) \ - -e POSTGRES_DB=$(POSTGRES_DB) \ - $(POSTGRES_IMAGE) + -e POSTGRES_COMPANIES_PASSWORD=$(POSTGRES_COMPANIES_PASSWORD) \ + -e POSTGRES_COMPANIES_USER=$(POSTGRES_COMPANIES_USER) \ + -e POSTGRES_COMPANIES_DB=$(POSTGRES_COMPANIES_DB) \ + $(POSTGRES_COMPANIES_IMAGE) wait-database: $(call wait-for, localhost:5432) @@ -177,7 +177,7 @@ wait-database: load-database: ifneq ("$(wildcard $(DATABASE_RESTORE_FILE))","") podman cp $(DATABASE_RESTORE_FILE) $(DATABASE_CONTAINER_NAME):/mnt/dump_file - podman exec $(DATABASE_CONTAINER_NAME) bash -c "pg_restore -v -c -h localhost -U $(POSTGRES_USER) -d $(POSTGRES_DB) /mnt/dump_file || true" + podman exec $(DATABASE_CONTAINER_NAME) bash -c "pg_restore -v -c -h localhost -U $(POSTGRES_COMPANIES_USER) -d $(POSTGRES_COMPANIES_DB) /mnt/dump_file || true" else @echo "cannot restore because file does not exists '$(DATABASE_RESTORE_FILE)'" @exit 1 diff --git a/config/config.py b/config/config.py index 3afc9e5..684d744 100644 --- a/config/config.py +++ b/config/config.py @@ -91,11 +91,11 @@ def __init__(self): self.themed_excerpt_number_of_fragments = int( os.environ.get("THEMED_EXCERPT_NUMBER_OF_FRAGMENTS", 1) ) - self.companies_database_host = os.environ.get("POSTGRES_HOST", "") - self.companies_database_db = os.environ.get("POSTGRES_DB", "") - self.companies_database_user = os.environ.get("POSTGRES_USER", "") - self.companies_database_pass = os.environ.get("POSTGRES_PASSWORD", "") - self.companies_database_port = os.environ.get("POSTGRES_PORT", "") + self.companies_database_host = os.environ.get("POSTGRES_COMPANIES_HOST", "") + self.companies_database_db = os.environ.get("POSTGRES_COMPANIES_DB", "") + self.companies_database_user = os.environ.get("POSTGRES_COMPANIES_USER", "") + self.companies_database_pass = os.environ.get("POSTGRES_COMPANIES_PASSWORD", "") + self.companies_database_port = os.environ.get("POSTGRES_COMPANIES_PORT", "") self.opensearch_user = os.environ.get("QUERIDO_DIARIO_OPENSEARCH_USER", "") self.opensearch_pswd = os.environ.get("QUERIDO_DIARIO_OPENSEARCH_PASSWORD", "") self.aggregates_database_host = os.environ.get("POSTGRES_AGGREGATES_HOST", "") diff --git a/config/sample.env b/config/sample.env index 6b16fae..a401e36 100644 --- a/config/sample.env +++ b/config/sample.env @@ -8,11 +8,11 @@ QUERIDO_DIARIO_SUGGESTION_SENDER_EMAIL=example@email.com QUERIDO_DIARIO_SUGGESTION_RECIPIENT_NAME=Recipient Name QUERIDO_DIARIO_SUGGESTION_RECIPIENT_EMAIL=example@email.com QUERIDO_DIARIO_SUGGESTION_MAILJET_CUSTOM_ID=AppCustomID -POSTGRES_USER=companies -POSTGRES_PASSWORD=companies -POSTGRES_DB=companiesdb -POSTGRES_HOST=localhost -POSTGRES_PORT=5432 +POSTGRES_COMPANIES_USER=companies +POSTGRES_COMPANIES_PASSWORD=companies +POSTGRES_COMPANIES_DB=companiesdb +POSTGRES_COMPANIES_HOST=localhost +POSTGRES_COMPANIES_PORT=5432 POSTGRES_AGGREGATES_USER=queridodiario POSTGRES_AGGREGATES_PASSWORD=queridodiario POSTGRES_AGGREGATES_DB=queridodiariodb From 4e3cad6da4be17657e228d9d6378870aec9aa227 Mon Sep 17 00:00:00 2001 From: wildemberg-sales Date: Mon, 5 Aug 2024 16:20:59 -0300 Subject: [PATCH 7/8] =?UTF-8?q?Implementa=20heran=C3=A7a=20de=20classe=20p?= =?UTF-8?q?ara=20reutiliza=C3=A7=C3=A3o=20de=20c=C3=B3digo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refatora verificação de erros na rota de agregados e Corrige nome de variáveis Remove try-except da rota de agregados Remove __init__ das classes do postgres de aggregates e companies Refatora método de retorno de dados dos agregados Implementa variavel .env do bucket e retorna com file_path do agregado --- aggregates/aggregates_access.py | 14 ++-- api/api.py | 32 ++++---- config/sample.env | 1 + database/__init__.py | 4 +- database/postgresql.py | 125 ++++++++++++-------------------- 5 files changed, 72 insertions(+), 104 deletions(-) diff --git a/aggregates/aggregates_access.py b/aggregates/aggregates_access.py index 50686da..66ad2c5 100644 --- a/aggregates/aggregates_access.py +++ b/aggregates/aggregates_access.py @@ -1,5 +1,5 @@ import abc -from typing import Optional +from typing import Optional, Dict class AggregatesDatabaseInterface(abc.ABC): """ @@ -42,23 +42,23 @@ def __init__( self, territory_id, state_code, - url_zip, + file_path, year, last_updated, hash_info, - file_size, + file_size_mb, ): self.territory_id = territory_id self.state_code = state_code - self.url_zip = url_zip + self.file_path = file_path self.year = year self.last_updated = last_updated self.hash_info = hash_info - self.file_size = file_size + self.file_size_mb = file_size_mb def __repr__(self): - return f"Aggregates(territory_id={self.territory_id}, state_code={self.state_code}, url_zip={self.url_zip}, year={self.year}, last_updated={self.last_updated}, hash_info={self.hash_info}, file_size={self.file_size})" - + return f"Aggregates(territory_id={self.territory_id}, state_code={self.state_code}, file_path={self.file_path}, year={self.year}, last_updated={self.last_updated}, hash_info={self.hash_info}, file_size_mb={self.file_size_mb})" + def create_aggregates_interface(database_gateway: AggregatesDatabaseInterface) -> AggregatesAccessInterface: if not isinstance(database_gateway, AggregatesDatabaseInterface): raise Exception( diff --git a/api/api.py b/api/api.py index 245e95c..18c71ff 100644 --- a/api/api.py +++ b/api/api.py @@ -91,11 +91,11 @@ class EntitiesSearchResponse(BaseModel): class Aggregates(BaseModel): territory_id: str state_code: str - url_zip: str + file_path: str year: str last_updated: datetime hash_info: str - file_size: str + file_size_mb: str class AggregatesSearchResponse(BaseModel): aggregates: List[Aggregates] @@ -567,25 +567,27 @@ async def get_partners( return {"total_partners": total_partners, "partners": partners} @app.get( - "/aggregate/{state_code}", - name="Get aggregates by state code and territory ID", + "/aggregates/{state_code}", + name="Get aggregated data files by state code and optionally territory ID", response_model=AggregatesSearchResponse, description="Get information about a aggregate by state code and territory ID.", responses={ - 400: {"model": HTTPExceptionMessage, "description": "City not found."}, + 404: {"model": HTTPExceptionMessage, "description": "State and/or city not found."}, },) async def get_aggregates(territory_id: Optional[str] = Query(None, description="City's 7-digit IBGE ID."), state_code: str = Path(..., description="City's state code.")): - try: - aggregates = app.aggregates.get_aggregates(territory_id, state_code.upper()) - return JSONResponse(status_code=200, - content={ - "state_code":state_code.upper(), - "territory_id":territory_id, - "aggregates":aggregates} - ) - except Exception as exc: - return JSONResponse(status_code=404, content={"detail": str(exc)}) + + aggregates = app.aggregates.get_aggregates(territory_id, state_code.upper()) + + if not aggregates: + return JSONResponse(status_code=404, content={"detail":"No aggregate file was found for the data reported."}) + + return JSONResponse(status_code=200, + content={ + "state_code":state_code.upper(), + "territory_id":territory_id, + "aggregates":aggregates} + ) def configure_api_app( gazettes: GazetteAccessInterface, diff --git a/config/sample.env b/config/sample.env index a401e36..3b8ccbd 100644 --- a/config/sample.env +++ b/config/sample.env @@ -8,6 +8,7 @@ QUERIDO_DIARIO_SUGGESTION_SENDER_EMAIL=example@email.com QUERIDO_DIARIO_SUGGESTION_RECIPIENT_NAME=Recipient Name QUERIDO_DIARIO_SUGGESTION_RECIPIENT_EMAIL=example@email.com QUERIDO_DIARIO_SUGGESTION_MAILJET_CUSTOM_ID=AppCustomID +QUERIDO_DIARIO_FILES_ENDPOINT=s3://queridodiariobucket/ POSTGRES_COMPANIES_USER=companies POSTGRES_COMPANIES_PASSWORD=companies POSTGRES_COMPANIES_DB=companiesdb diff --git a/database/__init__.py b/database/__init__.py index e8d15ee..5ba06e1 100644 --- a/database/__init__.py +++ b/database/__init__.py @@ -1,13 +1,13 @@ from companies import CompaniesDatabaseInterface from aggregates import AggregatesDatabaseInterface -from .postgresql import PostgreSQLDatabase, PostgreSQLDatabaseAggregates +from .postgresql import PostgreSQLDatabaseCompanies, PostgreSQLDatabaseAggregates def create_companies_database_interface( db_host, db_name, db_user, db_pass, db_port ) -> CompaniesDatabaseInterface: - return PostgreSQLDatabase(db_host, db_name, db_user, db_pass, db_port) + return PostgreSQLDatabaseCompanies(db_host, db_name, db_user, db_pass, db_port) def create_aggregates_database_interface( diff --git a/database/postgresql.py b/database/postgresql.py index 4d0871f..93a73d9 100644 --- a/database/postgresql.py +++ b/database/postgresql.py @@ -1,5 +1,6 @@ import logging import re +import os from typing import Any, Dict, Iterable, List, Tuple, Union, Optional import psycopg2 @@ -7,15 +8,14 @@ from companies import Company, InvalidCNPJException, Partner, CompaniesDatabaseInterface from aggregates import AggregatesDatabaseInterface, Aggregates - -class PostgreSQLDatabase(CompaniesDatabaseInterface): +class PostgreSQLDatabase: def __init__(self, host, database, user, password, port): self.host = host self.database = database self.user = user self.password = password self.port = port - + def _select(self, command: str, data: Dict = {}) -> Iterable[Tuple]: connection = psycopg2.connect( dbname=self.database, @@ -31,6 +31,16 @@ def _select(self, command: str, data: Dict = {}) -> Iterable[Tuple]: logging.debug(entry) yield entry logging.debug(f"Finished query: {cursor.query}") + + def _always_str_or_none(self, data: Any) -> Union[str, None]: + if data == "None" or data == "" or data is None: + return None + elif not isinstance(data, str): + return str(data) + else: + return data + +class PostgreSQLDatabaseCompanies(PostgreSQLDatabase, CompaniesDatabaseInterface): def get_company(self, cnpj: str = "") -> Union[Company, None]: command = """ @@ -154,14 +164,6 @@ def _format_partner_data(self, data: Tuple, cnpj: str) -> Partner: faixa_etaria=formatted_data[11], ) - def _always_str_or_none(self, data: Any) -> Union[str, None]: - if data == "None" or data == "" or data is None: - return None - elif not isinstance(data, str): - return str(data) - else: - return data - def _is_valid_cnpj(self, cnpj: str) -> bool: cnpj_only_digits = self._cnpj_only_digits(cnpj) if cnpj_only_digits == "" or len(cnpj_only_digits) > 14: @@ -226,87 +228,50 @@ def _unsplit_cnpj(self, cnpj_basico: str, cnpj_ordem: str, cnpj_dv: str) -> str: cnpj_dv=str(cnpj_dv).zfill(2), ) -class PostgreSQLDatabaseAggregates(AggregatesDatabaseInterface): - def __init__(self, host, database, user, password, port): - self.host = host - self.database = database - self.user = user - self.password = password - self.port = port +class PostgreSQLDatabaseAggregates(PostgreSQLDatabase, AggregatesDatabaseInterface): - def _select(self, command: str, data: Dict = {}) -> Iterable[Tuple]: - connection = psycopg2.connect( - dbname=self.database, - user=self.user, - password=self.password, - host=self.host, - port=self.port, - ) - with connection.cursor() as cursor: - cursor.execute(command, data) - logging.debug(f"Starting query: {cursor.query}") - for entry in cursor: - logging.debug(entry) - yield entry - logging.debug(f"Finished query: {cursor.query}") - - def _always_str_or_none(self, data: Any) -> Union[str, None]: - if data == "None" or data == "" or data is None: - return None - elif not isinstance(data, str): - return str(data) - else: - return data - - def _format_aggregates_data(self, data: Tuple) -> Dict: + def _format_aggregates_data(self, data: Tuple) -> Aggregates: formatted_data = [self._always_str_or_none(value) for value in data] - return { - "territory_id": formatted_data[1], - "state_code": formatted_data[2], - "url_zip": formatted_data[3], - "year": formatted_data[4], - "last_updated": formatted_data[5], - "hash_info": formatted_data[6], - "file_size": formatted_data[7] - } + return Aggregates( + territory_id=formatted_data[1], + state_code=formatted_data[2], + file_path=os.environ.get("QUERIDO_DIARIO_FILES_ENDPOINT","")+formatted_data[4], + year=formatted_data[3], + hash_info=formatted_data[6], + file_size_mb=formatted_data[5], + last_updated=formatted_data[7] + ) def get_aggregates(self, territory_id: Optional[str] = None, state_code: str = "") -> Union[List[Aggregates], None]: + command = """ + SELECT + * + FROM + aggregates + WHERE + state_code = %(state_code)s + AND + territory_id {territory_id_query_statement} + ORDER BY year DESC + """ + + data = { + "state_code": state_code + } + if territory_id is None: - command = """ - SELECT - * - FROM - aggregates - WHERE - state_code = %(state_code)s - AND - territory_id IS NULL - """ - data = { - "state_code": state_code - } + command = command.format(territory_id_query_statement="IS NULL") else: - command = """ - SELECT - * - FROM - aggregates - WHERE - territory_id = %(territory_id)s - AND - state_code = %(state_code)s - """ - data = { - "territory_id": territory_id, - "state_code": state_code - } + data["territory_id"] = territory_id + command = command.format(territory_id_query_statement="= %(territory_id)s") results = list(self._select(command, data)) + if not results: return [] return ( - [self._format_aggregates_data(result) for result in results] + [vars(self._format_aggregates_data(result)) for result in results] ) \ No newline at end of file From f06658bb5cb2dc48a249f9bd2b969ecf56502003 Mon Sep 17 00:00:00 2001 From: Wildemberg Sales Date: Thu, 15 Aug 2024 11:28:28 -0300 Subject: [PATCH 8/8] =?UTF-8?q?Implementa=20vari=C3=A1vel=20de=20ambiente?= =?UTF-8?q?=20local=20do=20bucket?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Wildemberg Sales --- config/sample.env | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/sample.env b/config/sample.env index 3b8ccbd..41fb740 100644 --- a/config/sample.env +++ b/config/sample.env @@ -8,7 +8,7 @@ QUERIDO_DIARIO_SUGGESTION_SENDER_EMAIL=example@email.com QUERIDO_DIARIO_SUGGESTION_RECIPIENT_NAME=Recipient Name QUERIDO_DIARIO_SUGGESTION_RECIPIENT_EMAIL=example@email.com QUERIDO_DIARIO_SUGGESTION_MAILJET_CUSTOM_ID=AppCustomID -QUERIDO_DIARIO_FILES_ENDPOINT=s3://queridodiariobucket/ +QUERIDO_DIARIO_FILES_ENDPOINT=http://localhost:9000/queridodiariobucket/ POSTGRES_COMPANIES_USER=companies POSTGRES_COMPANIES_PASSWORD=companies POSTGRES_COMPANIES_DB=companiesdb