Skip to content

Commit 72f1986

Browse files
committed
fix: lint fixes
1 parent 16afecc commit 72f1986

File tree

8 files changed

+386
-286
lines changed

8 files changed

+386
-286
lines changed

deployment/migrations/versions/0033_1c06d0ade60c_calculate_costs_statically.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
"""
88

99
from decimal import Decimal
10-
from typing import Dict
1110
from alembic import op
1211
import sqlalchemy as sa
1312
import logging
@@ -18,7 +17,7 @@
1817
from aleph.db.accessors.messages import get_message_by_item_hash
1918
from aleph.services.cost import _is_confidential_vm, get_detailed_costs, CostComputableContent
2019
from aleph.services.pricing_utils import build_default_pricing_model
21-
from aleph.types.cost import ProductPriceType, ProductPricing
20+
from aleph.types.cost import ProductPriceType
2221
from aleph.types.db_session import DbSession
2322

2423
logger = logging.getLogger("alembic")

src/aleph/services/pricing_utils.py

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
import datetime as dt
66
from typing import Dict, List, Union
77

8-
from aleph.db.accessors.aggregates import get_aggregate_elements, merge_aggregate_elements
8+
from aleph.db.accessors.aggregates import (
9+
get_aggregate_elements,
10+
merge_aggregate_elements,
11+
)
912
from aleph.db.models import AggregateElementDb
1013
from aleph.toolkit.constants import (
1114
DEFAULT_PRICE_AGGREGATE,
@@ -16,22 +19,24 @@
1619
from aleph.types.db_session import DbSession
1720

1821

19-
def build_pricing_model_from_aggregate(aggregate_content: Dict[Union[ProductPriceType, str], dict]) -> Dict[ProductPriceType, ProductPricing]:
22+
def build_pricing_model_from_aggregate(
23+
aggregate_content: Dict[Union[ProductPriceType, str], dict]
24+
) -> Dict[ProductPriceType, ProductPricing]:
2025
"""
2126
Build a complete pricing model from an aggregate content dictionary.
22-
23-
This function converts the DEFAULT_PRICE_AGGREGATE format or any pricing aggregate
24-
content into a dictionary of ProductPricing objects that can be used by the cost
27+
28+
This function converts the DEFAULT_PRICE_AGGREGATE format or any pricing aggregate
29+
content into a dictionary of ProductPricing objects that can be used by the cost
2530
calculation functions.
26-
31+
2732
Args:
2833
aggregate_content: Dictionary containing pricing information with ProductPriceType as keys
29-
34+
3035
Returns:
3136
Dictionary mapping ProductPriceType to ProductPricing objects
3237
"""
3338
pricing_model: Dict[ProductPriceType, ProductPricing] = {}
34-
39+
3540
for price_type, pricing_data in aggregate_content.items():
3641
try:
3742
price_type = ProductPriceType(price_type)
@@ -41,16 +46,17 @@ def build_pricing_model_from_aggregate(aggregate_content: Dict[Union[ProductPric
4146
except (KeyError, ValueError) as e:
4247
# Log the error but continue processing other price types
4348
import logging
49+
4450
logger = logging.getLogger(__name__)
4551
logger.warning(f"Failed to parse pricing for {price_type}: {e}")
46-
52+
4753
return pricing_model
4854

4955

5056
def build_default_pricing_model() -> Dict[ProductPriceType, ProductPricing]:
5157
"""
5258
Build the default pricing model from DEFAULT_PRICE_AGGREGATE constant.
53-
59+
5460
Returns:
5561
Dictionary mapping ProductPriceType to ProductPricing objects
5662
"""
@@ -60,55 +66,57 @@ def build_default_pricing_model() -> Dict[ProductPriceType, ProductPricing]:
6066
def get_pricing_aggregate_history(session: DbSession) -> List[AggregateElementDb]:
6167
"""
6268
Get all pricing aggregate updates in chronological order.
63-
69+
6470
Args:
6571
session: Database session
66-
72+
6773
Returns:
6874
List of AggregateElementDb objects ordered by creation_datetime
6975
"""
7076
aggregate_elements = get_aggregate_elements(
71-
session=session,
72-
owner=PRICE_AGGREGATE_OWNER,
73-
key=PRICE_AGGREGATE_KEY
77+
session=session, owner=PRICE_AGGREGATE_OWNER, key=PRICE_AGGREGATE_KEY
7478
)
7579
return list(aggregate_elements)
7680

7781

78-
def get_pricing_timeline(session: DbSession) -> List[tuple[dt.datetime, Dict[ProductPriceType, ProductPricing]]]:
82+
def get_pricing_timeline(
83+
session: DbSession,
84+
) -> List[tuple[dt.datetime, Dict[ProductPriceType, ProductPricing]]]:
7985
"""
8086
Get the complete pricing timeline with timestamps and pricing models.
81-
87+
8288
This function returns a chronologically ordered list of pricing changes,
83-
useful for processing messages in chronological order and applying the
89+
useful for processing messages in chronological order and applying the
8490
correct pricing at each point in time.
85-
91+
8692
This properly merges aggregate elements up to each point in time to create
8793
the cumulative pricing state, similar to how _update_aggregate works.
88-
94+
8995
Args:
9096
session: Database session
91-
97+
9298
Returns:
9399
List of tuples containing (timestamp, pricing_model)
94100
"""
95101
pricing_elements = get_pricing_aggregate_history(session)
96-
102+
97103
timeline = []
98-
104+
99105
# Add default pricing as the initial state
100-
timeline.append((dt.datetime.min.replace(tzinfo=dt.timezone.utc), build_default_pricing_model()))
101-
106+
timeline.append(
107+
(dt.datetime.min.replace(tzinfo=dt.timezone.utc), build_default_pricing_model())
108+
)
109+
102110
# Build cumulative pricing models by merging elements up to each timestamp
103111
elements_so_far = []
104112
for element in pricing_elements:
105113
elements_so_far.append(element)
106-
114+
107115
# Merge all elements up to this point to get the cumulative state
108116
merged_content = merge_aggregate_elements(elements_so_far)
109-
117+
110118
# Build pricing model from the merged content
111119
pricing_model = build_pricing_model_from_aggregate(merged_content)
112120
timeline.append((element.creation_datetime, pricing_model))
113-
114-
return timeline
121+
122+
return timeline

src/aleph/toolkit/constants.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1+
from typing import Dict, Union
2+
3+
from aleph.types.cost import ProductPriceType
4+
15
KiB = 1024
26
MiB = 1024 * 1024
37
GiB = 1024 * 1024 * 1024
48

59
MINUTE = 60
610
HOUR = 60 * MINUTE
711

8-
from aleph.types.cost import ProductPriceType
9-
1012
PRICE_AGGREGATE_OWNER = "0xFba561a84A537fCaa567bb7A2257e7142701ae2A"
1113
PRICE_AGGREGATE_KEY = "pricing"
1214
PRICE_PRECISION = 18
13-
DEFAULT_PRICE_AGGREGATE = {
15+
DEFAULT_PRICE_AGGREGATE: Dict[Union[ProductPriceType, str], dict] = {
1416
ProductPriceType.PROGRAM: {
1517
"price": {
1618
"storage": {"payg": "0.000000977", "holding": "0.05"},
@@ -50,7 +52,9 @@
5052
"memory_mib": 2048,
5153
},
5254
},
53-
ProductPriceType.WEB3_HOSTING: {"price": {"fixed": 50, "storage": {"holding": "0.333333333"}}},
55+
ProductPriceType.WEB3_HOSTING: {
56+
"price": {"fixed": 50, "storage": {"holding": "0.333333333"}}
57+
},
5458
ProductPriceType.PROGRAM_PERSISTENT: {
5559
"price": {
5660
"storage": {"payg": "0.000000977", "holding": "0.05"},

src/aleph/web/controllers/prices.py

Lines changed: 55 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,17 @@
2121
)
2222
from aleph.services.cost import (
2323
_get_product_price_type,
24-
_get_settings,
2524
get_detailed_costs,
2625
get_payment_type,
2726
get_total_and_detailed_costs,
2827
get_total_and_detailed_costs_from_db,
2928
)
3029
from aleph.services.pricing_utils import get_pricing_timeline
30+
from aleph.toolkit.constants import DEFAULT_SETTINGS_AGGREGATE
3131
from aleph.toolkit.costs import format_cost_str
3232
from aleph.types.db_session import DbSession
3333
from aleph.types.message_status import MessageStatus
34+
from aleph.types.settings import Settings
3435
from aleph.web.controllers.app_state_getters import (
3536
get_session_factory_from_request,
3637
get_storage_service_from_request,
@@ -174,7 +175,7 @@ async def message_price_estimate(request: web.Request):
174175

175176
async def recalculate_message_costs(request: web.Request):
176177
"""Force recalculation of message costs in chronological order with historical pricing.
177-
178+
178179
This endpoint will:
179180
1. Get all messages that need cost recalculation (if item_hash provided, just that message)
180181
2. Get the pricing timeline to track price changes over time
@@ -183,15 +184,15 @@ async def recalculate_message_costs(request: web.Request):
183184
5. Delete existing cost entries and recalculate with historical pricing
184185
6. Store the new cost calculations
185186
"""
186-
187+
187188
session_factory = get_session_factory_from_request(request)
188-
189+
189190
# Check if a specific message hash was provided
190191
item_hash_param = request.match_info.get("item_hash")
191-
192+
192193
with session_factory() as session:
193194
messages_to_recalculate: List[MessageDb] = []
194-
195+
195196
if item_hash_param:
196197
# Recalculate costs for a specific message
197198
try:
@@ -203,78 +204,99 @@ async def recalculate_message_costs(request: web.Request):
203204
# Recalculate costs for all executable messages, ordered by time (oldest first)
204205
select_stmt = (
205206
select(MessageDb)
206-
.where(MessageDb.type.in_([MessageType.instance, MessageType.program, MessageType.store]))
207+
.where(
208+
MessageDb.type.in_(
209+
[MessageType.instance, MessageType.program, MessageType.store]
210+
)
211+
)
207212
.order_by(MessageDb.time.asc())
208213
)
209214
result = session.execute(select_stmt)
210215
messages_to_recalculate = result.scalars().all()
211-
216+
212217
if not messages_to_recalculate:
213218
return web.json_response(
214-
{"message": "No messages found for cost recalculation", "recalculated_count": 0}
219+
{
220+
"message": "No messages found for cost recalculation",
221+
"recalculated_count": 0,
222+
}
215223
)
216-
224+
217225
# Get the pricing timeline to track price changes over time
218226
pricing_timeline = get_pricing_timeline(session)
219227
LOGGER.info(f"Found {len(pricing_timeline)} pricing changes in timeline")
220-
228+
221229
recalculated_count = 0
222230
errors = []
223231
current_pricing_model = None
224232
current_pricing_index = 0
225-
233+
226234
for message in messages_to_recalculate:
227235
try:
228236
# Find the applicable pricing model for this message's timestamp
229-
while (current_pricing_index < len(pricing_timeline) - 1 and
230-
pricing_timeline[current_pricing_index + 1][0] <= message.time):
237+
while (
238+
current_pricing_index < len(pricing_timeline) - 1
239+
and pricing_timeline[current_pricing_index + 1][0] <= message.time
240+
):
231241
current_pricing_index += 1
232-
242+
233243
current_pricing_model = pricing_timeline[current_pricing_index][1]
234244
pricing_timestamp = pricing_timeline[current_pricing_index][0]
235-
236-
LOGGER.debug(f"Message {message.item_hash} at {message.time} using pricing from {pricing_timestamp}")
237-
245+
246+
LOGGER.debug(
247+
f"Message {message.item_hash} at {message.time} using pricing from {pricing_timestamp}"
248+
)
249+
238250
# Delete existing cost entries for this message
239251
delete_costs_for_message(session, message.item_hash)
240-
252+
241253
# Get the message content and determine product type
242254
content: ExecutableContent = message.parsed_content
243-
product_type = _get_product_price_type(content, None, current_pricing_model)
244-
255+
256+
# TODO: Calculate settings timeline
257+
settings = Settings.from_aggregate(DEFAULT_SETTINGS_AGGREGATE)
258+
259+
product_type = _get_product_price_type(
260+
content, settings, current_pricing_model
261+
)
262+
245263
# Get the pricing for this specific product type
246264
if product_type not in current_pricing_model:
247-
LOGGER.warning(f"Product type {product_type} not found in pricing model for message {message.item_hash}")
265+
LOGGER.warning(
266+
f"Product type {product_type} not found in pricing model for message {message.item_hash}"
267+
)
248268
continue
249-
269+
250270
pricing = current_pricing_model[product_type]
251-
271+
252272
# Calculate new costs using the historical pricing model
253-
new_costs = get_detailed_costs(session, content, message.item_hash, pricing)
254-
273+
new_costs = get_detailed_costs(
274+
session, content, message.item_hash, pricing
275+
)
276+
255277
if new_costs:
256278
# Store the new cost calculations
257279
upsert_stmt = make_costs_upsert_query(new_costs)
258280
session.execute(upsert_stmt)
259-
281+
260282
recalculated_count += 1
261-
283+
262284
except Exception as e:
263285
error_msg = f"Failed to recalculate costs for message {message.item_hash}: {str(e)}"
264286
LOGGER.error(error_msg)
265287
errors.append({"item_hash": message.item_hash, "error": str(e)})
266-
288+
267289
# Commit all changes
268290
session.commit()
269-
291+
270292
response_data = {
271293
"message": "Cost recalculation completed with historical pricing",
272294
"recalculated_count": recalculated_count,
273295
"total_messages": len(messages_to_recalculate),
274-
"pricing_changes_found": len(pricing_timeline)
296+
"pricing_changes_found": len(pricing_timeline),
275297
}
276-
298+
277299
if errors:
278300
response_data["errors"] = errors
279-
301+
280302
return web.json_response(response_data)

0 commit comments

Comments
 (0)