Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update growth stage and growth stage start date in FarmRegistry #407

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 107 additions & 5 deletions django_project/dcas/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,23 @@
import time
import numpy as np
import pandas as pd
from django.db import connection
from django.db import connection, transaction
from django.db.models import Min
import dask.dataframe as dd
from dask.dataframe.core import DataFrame as dask_df
from django.contrib.gis.db.models import Union
from sqlalchemy import create_engine

from gap.models import FarmRegistry, Grid, CropGrowthStage
from dcas.models import DCASConfig, DCASConfigCountry
from gap.models import (
FarmRegistry,
Grid,
CropGrowthStage,
Farm
)
from dcas.models import (
DCASConfig,
DCASConfigCountry,
)
from dcas.partitions import (
process_partition_total_gdd,
process_partition_growth_stage,
Expand Down Expand Up @@ -434,13 +442,101 @@
self.duck_db_num_threads,
meta=farm_df_meta
)
self.update_farm_registry_growth_stage()

self.data_output.save(OutputType.FARM_CROP_DATA, farm_df)

def update_farm_registry_growth_stage(self):
"""Update growth stage in FarmRegistry."""
self.data_output._setup_s3fs()

# Construct the Parquet file path based on the requested date
parquet_path = (
self.data_output._get_directory_path(
self.data_output.DCAS_OUTPUT_DIR
) + "/iso_a3=*/year=*/month=*/day=*/*.parquet"
)

# Query Parquet files in chunks using grid_data_with_crop_meta
for chunk in self.data_query.read_grid_data_crop_meta_parquet(
parquet_path
):
if chunk.empty:
continue

Check warning on line 465 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L465

Added line #L465 was not covered by tests

# Ensure required columns exist
required_columns = {
"grid_id",
"growth_stage_id",
"growth_stage_start_date"
}
missing_columns = required_columns - set(chunk.columns)
if missing_columns:
raise ValueError(

Check warning on line 475 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L475

Added line #L475 was not covered by tests
f"Missing columns in grid_crop_df: {missing_columns}"
)

# Convert `growth_stage_start_date` safely
chunk["growth_stage_start_date"] = pd.to_datetime(
chunk["growth_stage_start_date"], errors="coerce"
).dt.date

# Create mapping of `grid_id` to `farm_id`
farm_mapping = {
row["grid_id"]: row["id"]
for row in Farm.objects.values("id", "grid_id")
}

# Fetch existing FarmRegistry Records using registry_id
registry_ids = list(chunk["registry_id"].dropna().unique())
existing_farm_registry = {
fr.id: fr for fr in FarmRegistry.objects.filter(
id__in=registry_ids
)
}

updates = []
for row in chunk.itertuples(index=False):
farm_id = farm_mapping.get(row.grid_id)
if farm_id:
# Ensure we're updating an existing record
farm_registry_instance = (
existing_farm_registry.get(farm_id)
)
if farm_registry_instance:
farm_registry_instance.crop_growth_stage_id = (
row.growth_stage_id
)
farm_registry_instance.growth_stage_start_date = (
row.growth_stage_start_date
)
updates.append(
farm_registry_instance
)

# Bulk Update in DB
if updates:
with transaction.atomic():
FarmRegistry.objects.bulk_update(
updates, [
"crop_growth_stage_id",
"growth_stage_start_date"
]
)

def _append_grid_crop_meta(self, farm_df_meta: pd.DataFrame):
# load from grid_crop data
grid_crop_df_meta = self.data_query.read_grid_data_crop_meta_parquet(
self.data_output.grid_crop_data_dir_path
grid_crop_df_meta_chunks = (
self.data_query.read_grid_data_crop_meta_parquet(
self.data_output.grid_crop_data_dir_path
)
)
# Convert iterator to a single DataFrame
grid_crop_df_meta = pd.concat(
list(
grid_crop_df_meta_chunks
),
ignore_index=True
)

# adding new columns:
Expand Down Expand Up @@ -472,6 +568,12 @@
self.data_collection()
self.process_grid_crop_data()
self.process_farm_registry_data()
csv_file = self.extract_csv_output()

Check warning on line 571 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L571

Added line #L571 was not covered by tests

self.send_csv_to_sftp(csv_file)

Check warning on line 573 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L573

Added line #L573 was not covered by tests

self.cleanup_gdd_matrix()

Check warning on line 575 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L575

Added line #L575 was not covered by tests

print(f'Finished {time.time() - start_time} seconds.')

def cleanup(self):
Expand Down
145 changes: 133 additions & 12 deletions django_project/dcas/tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

# import uuid
# import os
from datetime import datetime
from unittest import mock
from mock import patch, MagicMock
import pandas as pd
import dask.dataframe as dd
Expand Down Expand Up @@ -80,15 +82,17 @@ def test_merge_grid_data_with_config_using_default(self):
expected_df['config_id'] = expected_df['config_id'].astype('Int64')
pd.testing.assert_frame_equal(df, expected_df)

def test_process_farm_registry_data(self):
"""Test running process_farm_registry_data."""
@patch("dcas.queries.DataQuery.read_grid_data_crop_meta_parquet")
def test_process_farm_registry_data(self, mock_read_parquet):
"""Test running process_farm_registry_data with chunked data."""
pipeline = DCASDataPipeline(
[self.farm_registry_group.id], self.request_date
)
conn_engine = create_engine(pipeline._conn_str())
pipeline.data_query.setup(conn_engine)

grid_crop_meta_df = pd.DataFrame({
# Mock chunked DataFrame results for local grid_crop processing
grid_crop_meta_df_1 = pd.DataFrame({
'crop_id': [1],
'crop_stage_type_id': [1],
'planting_date': ['2025-01-01'],
Expand All @@ -98,24 +102,57 @@ def test_process_farm_registry_data(self):
'temperature': [10],
'grid_crop_key': ['1_1_1']
})
pipeline.data_query.read_grid_data_crop_meta_parquet = MagicMock(
return_value=grid_crop_meta_df
)
grid_crop_meta_df_2 = pd.DataFrame({
'crop_id': [2],
'crop_stage_type_id': [1],
'planting_date': ['2025-02-01'],
'grid_id': [2],
'planting_date_epoch': [2],
'__null_dask_index__': [0],
'temperature': [12],
'grid_crop_key': ['2_1_2']
})

# Mock chunked DataFrame for S3 Parquet processing
s3_parquet_chunk_1 = pd.DataFrame({
"grid_id": [1, 2],
"registry_id": [1001, 1002],
"growth_stage_id": [10, 20],
"growth_stage_start_date": ["2024-01-01", "2024-02-15"],
})

# Ensure the correct calls return different mock data
def mock_read_parquet_side_effect(parquet_path):
if "s3://" in parquet_path:
return iter([s3_parquet_chunk_1]) # Mock S3 Parquet read
else:
return iter([grid_crop_meta_df_1, grid_crop_meta_df_2])

mock_read_parquet.side_effect = mock_read_parquet_side_effect

pipeline.data_output.save = MagicMock()

# Mock the map_partitions method
with patch.object(
dd.DataFrame, 'map_partitions', autospec=True
dd.DataFrame,
'map_partitions',
autospec=True
) as mock_map_partitions:
# Set up the mock to call the mock function
mock_map_partitions.side_effect = mock_function_do_nothing

pipeline.process_farm_registry_data()

mock_map_partitions.assert_called_once()

pipeline.data_query.read_grid_data_crop_meta_parquet.\
assert_called_once()
# Validate correct call arguments
mock_read_parquet.assert_any_call("/tmp/dcas/grid_crop")
expected_s3_path = pipeline.data_output._get_directory_path(
pipeline.data_output.DCAS_OUTPUT_DIR
) + "/iso_a3=*/year=*/month=*/day=*/*.parquet"

mock_read_parquet.assert_any_call(expected_s3_path)

# Ensure the function was called
self.assertEqual(mock_read_parquet.call_count, 2)

pipeline.data_output.save.assert_called_once()
conn_engine.dispose()

Expand Down Expand Up @@ -236,3 +273,87 @@ def test_process_farm_registry_data(self):
# dtype='float64'
# )
# )


@patch("gap.models.FarmRegistry.objects.bulk_update")
@patch("gap.models.FarmRegistry.objects.filter")
@patch("dcas.queries.DataQuery.read_grid_data_crop_meta_parquet")
@patch("gap.models.Farm.objects.values")
def test_update_farm_registry_growth_stage(
self,
mock_farm_values,
mock_read_parquet,
mock_farmregistry_filter,
mock_bulk_update
):
"""Test update_farm_registry_growth_stage function in DCAS Pipeline."""
# Mock Farm.objects.values() to return grid_id → farm_id mappings
mock_farm_values.return_value = [
{"grid_id": 1, "id": 1001},
{"grid_id": 2, "id": 1002},
]

# Mock read_grid_data_crop_meta_parquet to return DataFrame
mock_read_parquet.return_value = iter([
pd.DataFrame({
"grid_id": [1, 2],
"registry_id": [1001, 1002], # Ensure these match `farm_id`
"growth_stage_id": [10, 20],
"growth_stage_start_date": ["2024-01-01", "2024-02-15"],
})
])

# Mock FarmRegistry objects with same `id` as in Parquet data
farm_registry_mock_1 = mock.Mock()
farm_registry_mock_1.id = 1001
farm_registry_mock_1.crop_growth_stage_id = None
farm_registry_mock_1.growth_stage_start_date = None

farm_registry_mock_2 = mock.Mock()
farm_registry_mock_2.id = 1002
farm_registry_mock_2.crop_growth_stage_id = None
farm_registry_mock_2.growth_stage_start_date = None

# Ensure filter returns the same IDs as in Parquet
mock_farmregistry_filter.return_value = [
farm_registry_mock_1,
farm_registry_mock_2
]

# Run the method
pipeline = DCASDataPipeline(
self.farm_registry_group, self.request_date
)
pipeline.update_farm_registry_growth_stage()

# Ensure bulk_update() was called with the correct updates
expected_updates = [
farm_registry_mock_1,
farm_registry_mock_2
]

# Validate `crop_growth_stage_id` and `growth_stage_start_date`
self.assertEqual(farm_registry_mock_1.crop_growth_stage_id, 10)
self.assertEqual(
farm_registry_mock_1.growth_stage_start_date,
datetime.strptime("2024-01-01", "%Y-%m-%d").date()
)
self.assertEqual(farm_registry_mock_2.crop_growth_stage_id, 20)
self.assertEqual(
farm_registry_mock_2.growth_stage_start_date,
datetime.strptime("2024-02-15", "%Y-%m-%d").date()
)

# Ensure bulk_update() was called with the correct objects
expected_updates = [farm_registry_mock_1, farm_registry_mock_2]
mock_bulk_update.assert_called_once_with(
expected_updates, [
"crop_growth_stage_id",
"growth_stage_start_date"
]
)

# Ensure bulk_update() was called exactly once
mock_bulk_update.assert_called_once_with(expected_updates, [
"crop_growth_stage_id", "growth_stage_start_date"
])
Loading