Skip to content

Commit

Permalink
Merge pull request #36 from ReubenFrankel/filter-collections
Browse files Browse the repository at this point in the history
feat: Filter collections
  • Loading branch information
edgarrmondragon authored Dec 13, 2024
2 parents 9aa0ff3 + ae5c785 commit de2cddd
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 4 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@ pipx install git+https://github.com/MeltanoLabs/tap-mongodb.git@main
### Accepted Config Options

| Setting | Type | Required | Default | Description |
|:-----------------------------------------|--------------|:--------:|:-----------------------------------:|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|:-----------------------------------------|------------- |:--------:|:-----------------------------------:|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| database | string | True | - | Database from which records will be extracted. |
| mongodb_connection_string | password | False | - | MongoDB connection string. See [the MongoDB documentation](https://www.mongodb.com/docs/manual/reference/connection-string/#connection-string-uri-format) for specification. The username and password included in this string must be url-encoded - the tap will not url-encode it. |
| documentdb_credential_json_string | password | False | - | JSON string with keys 'username', 'password', 'engine', 'host', 'port', 'dbClusterIdentifier' or 'dbName', 'ssl'. See example and strucure [in the AWS documentation here](https://docs.aws.amazon.com/secretsmanager/latest/userguide/reference_secret_json_structure.html#reference_secret_json_structure_docdb). The password from this JSON object will be url-encoded by the tap before opening the database connection. The intent of this setting is to enable management of an AWS DocumentDB database credential via AWS SecretsManager |
| documentdb_credential_json_extra_options | string | False | - | JSON string containing key-value pairs which will be added to the connection string options when using documentdb_credential_json_string. For example, when set to the string `{"tls":"true","tlsCAFile":"my-ca-bundle.pem"}`, the options `tls=true&tlsCAFile=my-ca-bundle.pem` will be passed to the MongoClient. |
| datetime_conversion | string | False | datetime | Parameter passed to MongoClient 'datetime_conversion' parameter. See documentation at https://pymongo.readthedocs.io/en/stable/examples/datetimes.html#handling-out-of-range-datetimes for details. The default value is 'datetime', which will throw a bson.errors.InvalidBson error if a document contains a date outside the range of datetime.MINYEAR (year 1) to datetime.MAXYEAR (9999). |
| prefix | string | False | '' | An optional prefix which will be added to the name of each stream. |
| filter_collections | string[] | False | [] | Collections to discover (default: all) - filtering is case-insensitive. Useful for improving catalog discovery performance. |
| start_date | date_iso8601 | False | 1970-01-01 | Start date - used for incremental replication only. In log-based replication mode, this setting is ignored. |
| add_record_metadata | boolean | False | False | When true, _sdc metadata fields will be added to records produced by the tap. |
| allow_modify_change_streams | boolean | False | False | In AWS DocumentDB (unlike MongoDB), change streams must be enabled specifically (see the [documentation here](https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html#change_streams-enabling) ). If attempting to open a change stream against a collection on which change streams have not been enabled, an OperationFailure error will be raised. If this property is set to True, when this error is seen, the tap will execute an admin command to enable change streams and then retry the read operation. Note: this may incur new costs in AWS DocumentDB. |
| operation_types | list(string) | False | create,delete,insert,replace,update | List of MongoDB change stream operation types to include in tap output. The default behavior is to limit to document-level operation types. See full list of operation types in [the MongoDB documentation](https://www.mongodb.com/docs/manual/reference/change-events/#operation-types). Note that the list of allowed_values for this property includes some values not available to all MongoDB versions. |
| operation_types | list(string) | False | create,delete,insert,replace,update | List of MongoDB change stream operation types to include in tap output. The default behavior is to limit to document-level operation types. See full list of operation types in [the MongoDB documentation](https://www.mongodb.com/docs/manual/reference/change-events/#operation-types). Note that the list of allowed_values for this property includes some values not available to all MongoDB versions. | |

### Configure using environment variables

Expand Down
4 changes: 4 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ plugins:
description: |
An optional prefix which will be added to each stream name.
value: ''
- name: filter_collections
kind: array
description: |
Collections to discover (default: all). Useful for improving catalog discovery performance.
- name: start_date
kind: date_iso8601
description: |
Expand Down
25 changes: 23 additions & 2 deletions tap_mongodb/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
MongoVersion = Tuple[int, int]


class MongoDBConnector:
class MongoDBConnector: # pylint: disable=too-many-instance-attributes
"""MongoDB/DocumentDB connector class"""

def __init__( # pylint: disable=too-many-arguments
Expand All @@ -30,12 +30,14 @@ def __init__( # pylint: disable=too-many-arguments
db_name: str,
datetime_conversion: str,
prefix: Optional[str] = None,
collections: Optional[List[str]] = None,
) -> None:
self._connection_string = connection_string
self._options = options
self._db_name = db_name
self._datetime_conversion: str = datetime_conversion.upper()
self._prefix: Optional[str] = prefix
self._collections = collections
self._logger: Logger = getLogger(__name__)
self._version: Optional[MongoVersion] = None

Expand Down Expand Up @@ -110,7 +112,26 @@ def discover_catalog_entries(self) -> List[Dict[str, Any]]:
The discovered catalog entries as a list.
"""
result: List[Dict] = []
for collection in self.database.list_collection_names(authorizedCollections=True, nameOnly=True):

collections = self.database.list_collection_names(
authorizedCollections=True,
nameOnly=True,
filter={
"$or": [
{
"name": {
"$regex": f"^{c}$",
"$options": "i",
}
}
for c in self._collections
]
}
if self._collections
else None,
)

for collection in collections:
try:
self.database[collection].find_one()
except PyMongoError:
Expand Down
11 changes: 11 additions & 0 deletions tap_mongodb/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ class TapMongoDB(Tap):
default="",
description="An optional prefix which will be added to each stream name.",
),
th.Property(
"filter_collections",
th.ArrayType(th.StringType),
required=True,
default=[],
description=(
"Collections to discover (default: all) - filtering is case-insensitive. Useful for improving catalog "
"discovery performance."
),
),
th.Property(
"start_date",
th.DateTimeType,
Expand Down Expand Up @@ -206,6 +216,7 @@ def connector(self) -> MongoDBConnector:
self.config.get("database"),
self.config.get("datetime_conversion"),
prefix=self.config.get("prefix", None),
collections=self.config["filter_collections"],
)

@property
Expand Down

0 comments on commit de2cddd

Please sign in to comment.