Skip to content

Commit

Permalink
Update backfilling asset cohorts
Browse files Browse the repository at this point in the history
* Limit backfilling of cohorts only to last 5 days
* Extract methods from `restore_missing_cohorts`:
  * `_get_asset_cohort_snapshots` - checks whether there are enough
    snapshots to perform backfill
  * `restore_missing_cohorts` - produces queries for backfilling cohorts
  * `save_restored_asset_cohort` - executes backfilling query
* Add tests

Change-Id: I996a60bcc9a7b46a57b1f273ef797068bba7b68b
  • Loading branch information
AVMarkin committed Jul 3, 2024
1 parent 5ee91a5 commit 8f113d0
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 45 deletions.
163 changes: 118 additions & 45 deletions app/scripts/backfill_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import logging
from bisect import bisect_left
from collections.abc import Sequence
from datetime import datetime, timedelta
import datetime
from typing import Generator

import gaarf
import numpy as np
Expand All @@ -29,7 +30,7 @@
from gaarf.executors import bq_executor
from gaarf.io.writers import bigquery_writer
from google.api_core import exceptions as google_api_exceptions
from scripts.src import queries
from src import queries


def get_new_date_for_missing_incremental_snapshots(
Expand Down Expand Up @@ -264,60 +265,119 @@ def _format_partial_event_history(
return events.groupby(['day', 'campaign_id']).last()


def restore_missing_cohorts(
def _get_asset_cohorts_snapshots(
bigquery_executor: bq_executor.BigQueryExecutor, bq_dataset: str
) -> None:
) -> set[datetime.date]:
"""Get suffixes of asset cohorts snapshots for the last 5 days.
Args:
bigquery_executor: Instantiated executor to write data to BigQuery.
bq_dataset: BigQuery dataset to save data to.
Returns:
Days when snapshots are present.
"""
snapshot_dates: set[datetime.date] = set()
try:
result = bigquery_executor.execute(
'conversion_lags_snapshots',
f'SELECT DISTINCT _TABLE_SUFFIX AS day FROM '
f'`{bq_dataset}.conversion_lags_*` ORDER BY 1',
f"""
SELECT DISTINCT _TABLE_SUFFIX AS day
FROM
`{bq_dataset}.conversion_lags_*`
WHERE
_TABLE_SUFFIX >= FORMAT_DATE(
"%Y%m%d",DATE_SUB(CURRENT_DATE, INTERVAL 5 DAY)
)
AND _TABLE_SUFFIX < FORMAT_DATE( "%Y%m%d",CURRENT_DATE)
ORDER BY 1
""",
)
except bq_executor.BigQueryExecutorException:
return
snapshot_dates = set(result.to_dataframe()['day'])
dates = list(
pd.date_range(min(snapshot_dates), max(snapshot_dates))
logging.warning('failed to get data')
if result and (snapshots_days := set(result.day)):
snapshot_dates = {
datetime.datetime.strptime(date, '%Y%m%d').date()
for date in snapshots_days
}
else:
logging.warning('No available assert cohorts snapshots for the last 5 days')
return snapshot_dates


def restore_missing_cohorts(
snapshot_dates: set[datetime.date], bq_dataset: str
) -> Generator[str, None, None]:
"""Restores missing asset cohorts data.
Cohorts snapshots can be restored only for the last 5 days.
Args:
snapshot_dates: Dates when cohort snapshots are present.
bq_dataset: BigQuery dataset to save data to.
Yields:
Table id and query to restore the snapshot.
"""
dates = {
day.date()
for day in pd.date_range(min(snapshot_dates), max(snapshot_dates))
.to_pydatetime()
.tolist()
)
missing_dates = list(set(dates).difference(snapshot_dates))
snapshot_dates = sorted(
[datetime.strptime(date, '%Y%m%d').date() for date in snapshot_dates]
)
}

if not (missing_dates := list(dates.difference(snapshot_dates))):
return
if not missing_dates:
logging.info('No asset cohort snapshots to backfill')
return
snapshot_dates = sorted(snapshot_dates)
for date in sorted(missing_dates):
index = bisect_left(snapshot_dates, date.date())
index = bisect_left(snapshot_dates, date)
if index == 0:
continue
last_available_date = snapshot_dates[index - 1]
date_diff = (date.date() - last_available_date).days
date_diff = (date - last_available_date).days
table_id = f'{bq_dataset}.conversion_lags_{date.strftime("%Y%m%d")}'
query = f"""
CREATE OR REPLACE TABLE `{table_id}`
AS
SELECT
day_of_interaction,
lag+{date_diff} AS lag,
ad_group_id,
asset_id,
field_type,
network AS network,
installs,
inapps,
view_through_conversions,
conversions_value
FROM `{bq_dataset}.conversion_lags_{last_available_date.strftime("%Y%m%d")}`
"""
try:
result = bigquery_executor.execute(
'restore_conversion_lag_snapshot', query
)
logging.info("table '%s' has been created", table_id)
except google_api_exceptions.Conflict:
logging.warning("table '%s' already exists", table_id)
last_available_date_table_id = last_available_date.strftime('%Y%m%d')
yield (
table_id,
f"""
CREATE OR REPLACE TABLE `{table_id}`
AS
SELECT
day_of_interaction,
lag+{date_diff} AS lag,
ad_group_id,
asset_id,
field_type,
network AS network,
installs,
inapps,
view_through_conversions,
conversions_value
FROM `{bq_dataset}.conversion_lags_{last_available_date_table_id}`
""",
)


def save_restored_asset_cohort(
bigquery_executor: bq_executor.BigQueryExecutor, query: str, table_id: str
) -> None:
"""Execute query for backfilling asset cohort snapshot.
Cohorts snapshots can be restored only for the last 5 days.
Args:
bigquery_executor: Instantiated executor to write data to BigQuery.
query: Query for snapshot backfilling.
table_id: Full name of the table when snapshot saved to.
"""
try:
bigquery_executor.execute('restore_conversion_lag_snapshot', query)
logging.info("table '%s' has been created", table_id)
except bq_executor.BigQueryExecutorException:
logging.warning("table '%s' already exists", table_id)


def _get_bid_budget_snapshot_dates(
Expand Down Expand Up @@ -377,7 +437,9 @@ def save_restored_change_history(
daily_history = restored_bid_budget_history.loc[
restored_bid_budget_history['day'] == date
]
daily_history.loc[:, ('day')] = datetime.strptime(date, '%Y-%m-%d').date()
daily_history.loc[:, ('day')] = datetime.datetime.strptime(
date, '%Y-%m-%d'
).date()
table_id = f'bid_budgets_{date.replace("-","")}'
try:
bq_writer.write(
Expand Down Expand Up @@ -435,8 +497,12 @@ def main():
write_disposition='WRITE_EMPTY',
)
date_range = (
(datetime.now() - timedelta(days=28)).strftime('%Y-%m-%d'),
(datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d'),
(datetime.datetime.now() - datetime.timedelta(days=28)).strftime(
'%Y-%m-%d'
),
(datetime.datetime.now() - datetime.timedelta(days=1)).strftime(
'%Y-%m-%d'
),
)
if not (
missing_dates := _get_bid_budget_snapshot_missing_dates(
Expand All @@ -457,8 +523,15 @@ def main():
bq_writer, restored_bid_budget_history, missing_dates
)

if args.cohorts:
restore_missing_cohorts(bigquery_executor, bq_dataset)
if args.cohorts and (
snapshot_dates := _get_asset_cohorts_snapshots(
bigquery_executor, bq_dataset
)
):
for table_id, query in restore_missing_cohorts(
bigquery_executor, bq_dataset
):
save_restored_asset_cohort(bigquery_executor, query, table_id)


if __name__ == '__main__':
Expand Down
44 changes: 44 additions & 0 deletions tests/unit/test_backfill_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import itertools

import pandas as pd
Expand Down Expand Up @@ -85,3 +86,46 @@ def test_restore_bid_budget_history():
}
)
assert restored_change_history.equals(expected_change_history)


def test_restore_missing_cohorts_returns_correct_sql_statements():
max_snapshot_dates = 5
snapshot_dates = {
datetime.date(2024, 1, 1),
datetime.date(2024, 1, 5),
}
statements = list(
backfill_snapshots.restore_missing_cohorts(snapshot_dates, 'test_dataset')
)
assert len(statements) == max_snapshot_dates - len(snapshot_dates)

def test_restore_missing_cohorts_without_missing_snapshots_does_nothing():
snapshot_dates = {
datetime.date(2024, 1, 1),
datetime.date(2024, 1, 2),
datetime.date(2024, 1, 3),
datetime.date(2024, 1, 4),
datetime.date(2024, 1, 5),
}
statements = list(
backfill_snapshots.restore_missing_cohorts(snapshot_dates, 'test_dataset')
)
assert not statements

def test_restore_missing_cohorts_with_with_only_start_date_does_nothing():
snapshot_dates = {
datetime.date(2024, 1, 5),
}
statements = list(
backfill_snapshots.restore_missing_cohorts(snapshot_dates, 'test_dataset')
)
assert not statements

def test_restore_missing_cohorts_with_with_only_end_date_does_nothing():
snapshot_dates = {
datetime.date(2024, 1, 1),
}
statements = list(
backfill_snapshots.restore_missing_cohorts(snapshot_dates, 'test_dataset')
)
assert not statements

0 comments on commit 8f113d0

Please sign in to comment.