Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update growth stage and growth stage start date in FarmRegistry #407

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 74 additions & 2 deletions django_project/dcas/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@
import time
import numpy as np
import pandas as pd
from django.db import connection
from django.db import connection, transaction
from django.db.models import Min
import dask.dataframe as dd
from dask.dataframe.core import DataFrame as dask_df
from django.contrib.gis.db.models import Union
from sqlalchemy import create_engine

from gap.models import FarmRegistryGroup, FarmRegistry, Grid, CropGrowthStage
from gap.models import (
FarmRegistryGroup,
FarmRegistry,
Grid,
CropGrowthStage,
Farm
)
from dcas.models import DCASConfig, DCASConfigCountry
from dcas.partitions import (
process_partition_total_gdd,
Expand Down Expand Up @@ -429,6 +435,71 @@

self.data_output.save(OutputType.FARM_CROP_DATA, farm_df)

def update_farm_registry_growth_stage(self):
"""Efficiently update growth stage in FarmRegistry."""
# Load Data
grid_crop_df = self.data_query.read_grid_data_crop_meta_parquet(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@osundwajeff this should be query to the farm parquet files like you did on error handling. It needs to accept year, month, and day from the DCASRequest.requested_at.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also need to use chunks to avoid loading all data in the memory.

self.data_output.grid_crop_data_dir_path
)

# Ensure required columns exist
required_columns = {
"grid_id",
"growth_stage_id",
"growth_stage_start_date"
}
missing_columns = required_columns - set(grid_crop_df.columns)
if missing_columns:
raise ValueError(

Check warning on line 453 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L453

Added line #L453 was not covered by tests
f"Missing columns in grid_crop_df: {missing_columns}"
)

# Convert `growth_stage_start_date` to datetime safely
grid_crop_df["growth_stage_start_date"] = pd.to_datetime(
grid_crop_df["growth_stage_start_date"], errors="coerce"
).dt.date # Convert to date format

# Get mapping of grid_id to farm_id
farm_mapping = {
row["grid_id"]: row["id"] for row in Farm.objects.values(
"id",
"grid_id"
)
}

# Find Existing FarmRegistry Records
farm_ids = list(farm_mapping.values()) # Get farm IDs
existing_farm_registry = {
fr.farm_id: fr for fr in FarmRegistry.objects.filter(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use 'registry_id' column from the parquet to filter by 'id__in'.

farm_id__in=farm_ids
)
}

updates = []
for row in grid_crop_df.itertuples(index=False):
farm_id = farm_mapping.get(row.grid_id)
if farm_id:
# Ensure we're updating an existing record
farm_registry_instance = existing_farm_registry.get(farm_id)
if farm_registry_instance:
farm_registry_instance.crop_growth_stage_id = (
row.growth_stage_id
)
farm_registry_instance.growth_stage_start_date = (
row.growth_stage_start_date
)
updates.append(farm_registry_instance)

# Bulk Update
if updates:
with transaction.atomic():
FarmRegistry.objects.bulk_update(
updates, [
"crop_growth_stage_id",
"growth_stage_start_date"
]
)

def _append_grid_crop_meta(self, farm_df_meta: pd.DataFrame):
# load from grid_crop data
grid_crop_df_meta = self.data_query.read_grid_data_crop_meta_parquet(
Expand Down Expand Up @@ -470,6 +541,7 @@
self.process_grid_crop_data()

self.process_farm_registry_data()
self.update_farm_registry_growth_stage()

Check warning on line 544 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L544

Added line #L544 was not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please call this method to celery task with input of request_id from DCASRequest

csv_file = self.extract_csv_output()

self.send_csv_to_sftp(csv_file)
Expand Down
63 changes: 63 additions & 0 deletions django_project/dcas/tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

# import uuid
# import os
from unittest import mock
from mock import patch, MagicMock
import pandas as pd
import dask.dataframe as dd
Expand Down Expand Up @@ -236,3 +237,65 @@ def test_process_farm_registry_data(self):
# dtype='float64'
# )
# )


@patch("gap.models.FarmRegistry.objects.bulk_update")
@patch("gap.models.FarmRegistry.objects.filter")
@patch("dcas.queries.DataQuery.read_grid_data_crop_meta_parquet")
@patch("gap.models.Farm.objects.values")
def test_update_farm_registry_growth_stage(
self,
mock_farm_values,
mock_read_parquet,
mock_farmregistry_filter,
mock_bulk_update
):
"""Test update_farm_registry_growth_stage function in DCAS Pipeline."""
# Mock Farm.objects.values() to return grid_id → farm_id mappings
mock_farm_values.return_value = [
{"grid_id": 1, "id": "FARM-001"},
{"grid_id": 2, "id": "FARM-002"},
]

# Mock the DataFrame returned by read_grid_data_crop_meta_parquet
mock_read_parquet.return_value = pd.DataFrame({
"grid_id": [1, 2],
"growth_stage_id": [10, 20],
"growth_stage_start_date": ["2024-01-01", "2024-02-15"],
})

# Create Valid Mocked FarmRegistry Objects (with real `id` values)
farm_registry_mock_1 = mock.Mock()
farm_registry_mock_1.id = 1
farm_registry_mock_1.farm_id = "FARM-001"

farm_registry_mock_2 = mock.Mock()
farm_registry_mock_2.id = 2
farm_registry_mock_2.farm_id = "FARM-002"

# Mock `FarmRegistry.objects.filter()` to return these objects
mock_farmregistry_filter.return_value = [
farm_registry_mock_1,
farm_registry_mock_2
]

# Run the method
pipeline = DCASDataPipeline(
self.farm_registry_group, self.request_date
)
pipeline.update_farm_registry_growth_stage()

# Validate bulk_update() calls
expected_updates = [
farm_registry_mock_1,
farm_registry_mock_2,
]
mock_bulk_update.assert_called_once_with(
expected_updates, [
"crop_growth_stage_id",
"growth_stage_start_date"
]
)

# Ensure bulk_update() was called exactly once
self.assertEqual(mock_bulk_update.call_count, 1)
Loading