From 51be0663e8f4aa67bdf51017d16301b68fe0f7ee Mon Sep 17 00:00:00 2001 From: Raymond Yee Date: Sat, 13 Jun 2026 17:22:27 -0700 Subject: [PATCH] OC sync to current export (#272) + search/facet fixes (#277/#283) -> isamples_202608 Cuts the explorer over from 202606 to the 202608 build and adds the reproducible pipeline that produced it. Data (verified, no regressions vs 202606): - OC synced to Eric's current export: +67,187 new records (incl. Tall al-Umayri Jordan lithics) / -21,227 stale Murlo re-IDs (his Option B). OC count 1,110,791 (= his wide); material/1.0/rock 37,953 (his number). - Other sources (SESAR/GEOME/Smithsonian) byte-identical. Fixes: - #277: OC site-path descriptions restored (Cyprus search 0 -> 69,230); concept labels now searchable (pottery Cyprus 0 -> 1,305). - #283a: blank Sampled-Feature facet removed. #283b: specimentype/1.0 labels. - #260/#265: OC material/object-type corrected via the synced concepts. Pipeline hardening (8 build rounds, 5 Codex reviews): - ingest_oc_records.py: fixpoint orphan removal; in-script dangling-ref gate over all 12 reference columns; hard silent-drop guard; keyword minting. - validate_frontend_derived.py: --wide semantic gate shares the facets description expression with the builder (cannot drift). - 70/70 pipeline tests pass; --wide trust gate green. Data files already published to R2 (data.isamples.org), uniquely named; this is the explorer cutover. vocab_labels_202608 keeps prod vocab_labels untouched. Co-Authored-By: Claude Fable 5 --- Makefile | 27 +- explorer.qmd | 24 +- scripts/build_frontend_derived.py | 79 +- scripts/build_vocab_labels.py | 38 + scripts/ingest_oc_records.py | 1207 ++++++++++++++++++ scripts/validate_frontend_derived.py | 18 +- tests/test_frontend_derived.py | 6 +- tests/test_ingest_oc_records.py | 1683 ++++++++++++++++++++++++++ 8 files changed, 3055 insertions(+), 27 deletions(-) create mode 100644 scripts/ingest_oc_records.py create mode 100644 tests/test_ingest_oc_records.py diff --git a/Makefile b/Makefile index b4fff9c6..8f2f152a 100644 --- a/Makefile +++ b/Makefile @@ -31,7 +31,7 @@ VALIDATE := scripts/validate_frontend_derived.py ENRICH := scripts/enrich_wide_with_oc_concepts.py VALIDATE_ENRICH := scripts/validate_oc_concept_enrichment.py -.PHONY: help test wide oc-wide enrich validate-enrich derived validate all all-272 clean +.PHONY: help test wide oc-wide enrich validate-enrich derived validate all all-272 ingest-272 all-202608 clean help: @grep -E '^# make' Makefile | sed 's/^# / /' @@ -81,5 +81,30 @@ all-272: validate-enrich $(MAKE) derived DERIVED_WIDE=$(ENRICHED) TAG=$(TAG) $(MAKE) validate TAG=$(TAG) SENTINEL_FLAG= +# TRUE SYNC ingestion: add 67,187 new OC pids + remove 21,227 stale OC pids (#272 Phase 2, D3). +# Requires the 202606 wide (--src) and Eric's OC wide (--oc-wide). +# Outputs a 202608-tagged wide parquet + derived files in $(OUTDIR). +# +# make ingest-272 \ +# SRC_202606=~/Data/iSample/pqg_refining/isamples_202606_wide.parquet \ +# OC_WIDE_2026=~/Data/iSample/pqg_refining/oc_isamples_pqg_wide_2026-06-09.parquet +# +INGEST_TAG ?= isamples_202608 +SRC_202606 ?= $(OUTDIR)/isamples_202606_wide.parquet +OC_WIDE_2026 ?= $(OUTDIR)/oc_isamples_pqg_wide_2026-06-09.parquet +INGEST_OUT ?= $(OUTDIR)/$(INGEST_TAG)_wide.parquet +INGEST := scripts/ingest_oc_records.py + +$(INGEST_OUT): $(SRC_202606) $(OC_WIDE_2026) + @mkdir -p $(OUTDIR) + $(PY) $(INGEST) --src $(SRC_202606) --oc-wide $(OC_WIDE_2026) --out $(INGEST_OUT) + +ingest-272: $(INGEST_OUT) + +# Full 202608 pipeline: ingest -> derived -> validate +all-202608: ingest-272 + $(MAKE) derived DERIVED_WIDE=$(INGEST_OUT) TAG=$(INGEST_TAG) + $(MAKE) validate TAG=$(INGEST_TAG) SENTINEL_FLAG= + clean: rm -rf $(OUTDIR) diff --git a/explorer.qmd b/explorer.qmd index 80620db7..08b7feaa 100644 --- a/explorer.qmd +++ b/explorer.qmd @@ -12,9 +12,9 @@ format: include-in-header: text: | - - - + + + --- ```{=html} @@ -749,23 +749,23 @@ R2_BASE = (() => { // default and absolute overrides (http://localhost:8099/data) pass through. return raw.startsWith('/') ? new URL(raw, location.origin).href : raw; })() -h3_res4_url = `${R2_BASE}/isamples_202606_h3_summary_res4.parquet` -h3_res6_url = `${R2_BASE}/isamples_202606_h3_summary_res6.parquet` -h3_res8_url = `${R2_BASE}/isamples_202606_h3_summary_res8.parquet` -lite_url = `${R2_BASE}/isamples_202606_samples_map_lite.parquet` +h3_res4_url = `${R2_BASE}/isamples_202608_h3_summary_res4.parquet` +h3_res6_url = `${R2_BASE}/isamples_202608_h3_summary_res6.parquet` +h3_res8_url = `${R2_BASE}/isamples_202608_h3_summary_res8.parquet` +lite_url = `${R2_BASE}/isamples_202608_samples_map_lite.parquet` // Explicit versioned wide (#272: OC concept-enriched — popups read material/ // object-type from this file). The stable alias `current/wide.parquet` still // points at the previous wide until the production cutover flips the manifest; // pinning the version here keeps staging and prod each self-consistent. -wide_url = `${R2_BASE}/isamples_202606_wide.parquet` +wide_url = `${R2_BASE}/isamples_202608_wide.parquet` // v2 carries object_type alongside material and context (URI-string columns). -facets_url = `${R2_BASE}/isamples_202606_sample_facets_v2.parquet` -facet_summaries_url = `${R2_BASE}/isamples_202606_facet_summaries.parquet` +facets_url = `${R2_BASE}/isamples_202608_sample_facets_v3.parquet` +facet_summaries_url = `${R2_BASE}/isamples_202608_facet_summaries.parquet` // Pre-aggregated single-filter cache for fast cross-filtered facet counts. -cross_filter_url = `${R2_BASE}/isamples_202606_facet_cross_filter.parquet` +cross_filter_url = `${R2_BASE}/isamples_202608_facet_cross_filter.parquet` // SKOS prefLabels for Material / Sampled Feature / Specimen Type URIs. // ~60 KB lookup; falls back to URI tail if a URI isn't covered. -vocab_labels_url = `${R2_BASE}/vocab_labels.parquet` +vocab_labels_url = `${R2_BASE}/vocab_labels_202608.parquet` // Canonical palette — see issue #113. Path-relative so this works under // both isamples.org (custom domain at root) and project-pages fork diff --git a/scripts/build_frontend_derived.py b/scripts/build_frontend_derived.py index b4f10391..dd29addd 100755 --- a/scripts/build_frontend_derived.py +++ b/scripts/build_frontend_derived.py @@ -13,7 +13,7 @@ live in `p__has_{material,context,sample_object}_category` row-id arrays. OUTPUTS (into --outdir, prefixed --tag): - - {tag}_sample_facets_v2.parquet pid, source, material, context, object_type, label, description, place_name(VARCHAR) + - {tag}_sample_facets_v2.parquet pid, source, material, context, object_type, label, description (search-only; includes appended concept labels), place_name(VARCHAR) - {tag}_samples_map_lite.parquet pid, label, source, latitude, longitude, place_name(VARCHAR[]), result_time, h3_res8(UBIGINT), h3_res8_hex - {tag}_h3_summary_res{4,6,8}.parquet h3_cell(UBIGINT), sample_count(INT), center_lat, center_lng, dominant_source, source_count(INT), resolution(INT) - {tag}_facet_summaries.parquet facet_type, facet_value, scheme, count @@ -48,6 +48,21 @@ ARTIFACTS = ["sample_facets_v2", "samples_map_lite", "h3_summaries", "facet_summaries", "facet_cross_filter", "wide_h3"] +# Shared SQL expression for sample_facets_v2.description (#277 part 2). +# Appends space-joined concept labels (IC labels across all 4 concept dims) +# to the raw description so full-text search matches concept terms even when +# they don't appear in label/description/place_name. description is +# SEARCH-ONLY in facets_v2 — display reads from the wide parquet. +# Used by build_sample_facets_v2 AND the validator's --wide semantic gate so +# they can never drift from each other. +FACETS_DESCRIPTION_EXPR = ( + "CASE" + " WHEN concept_labels IS NOT NULL AND TRIM(concept_labels) != ''" + " THEN COALESCE(description, '') || ' ' || concept_labels" + " ELSE description" + " END" +) + def log(msg, t0): print(f"[{time.time()-t0:6.1f}s] {msg}", flush=True) @@ -77,8 +92,12 @@ def geometry_expr(con, wide): def build_base_tables(con, wide, t0): geom = geometry_expr(con, wide) con.execute(f""" + -- ic: concept lookup for facet resolution and label aggregation. + -- label is included so concept_labels can aggregate human-readable text + -- directly from the wide without a second scan. CREATE OR REPLACE TEMP TABLE ic AS - SELECT row_id, pid AS uri FROM read_parquet('{wide}') WHERE otype='IdentifiedConcept'; + SELECT row_id, pid AS uri, label + FROM read_parquet('{wide}') WHERE otype='IdentifiedConcept'; -- material: FIRST NON-ROOT concept per sample. Decorrelated (unnest+join+ -- arg_min by array ordinality) — NOT a correlated subquery and NOT a MAP @@ -95,6 +114,39 @@ def build_base_tables(con, wide, t0): WHERE ic.uri <> '{MATERIAL_ROOT}' GROUP BY ex.pid; + -- concept_labels: one row per MSR pid; concept_labels is a space-joined + -- string of all DISTINCT non-null IC labels referenced across + -- p__has_material_category, p__has_sample_object_type, + -- p__has_context_category, and p__keywords. Appended (search-only) into + -- sample_facets_v2.description so full-text searches like "pottery cyprus" + -- match samples tagged with a pottery concept even if the word doesn't + -- appear in their label/description/place_name. facets_v2.description is + -- SEARCH-ONLY; display always reads description from the wide parquet. + CREATE OR REPLACE TEMP TABLE concept_labels AS + WITH all_refs AS ( + SELECT s.pid, u.rid + FROM read_parquet('{wide}') s, UNNEST(s.p__has_material_category) AS u(rid) + WHERE s.otype='MaterialSampleRecord' + UNION ALL + SELECT s.pid, u.rid + FROM read_parquet('{wide}') s, UNNEST(s.p__has_sample_object_type) AS u(rid) + WHERE s.otype='MaterialSampleRecord' + UNION ALL + SELECT s.pid, u.rid + FROM read_parquet('{wide}') s, UNNEST(s.p__has_context_category) AS u(rid) + WHERE s.otype='MaterialSampleRecord' + UNION ALL + SELECT s.pid, u.rid + FROM read_parquet('{wide}') s, UNNEST(s.p__keywords) AS u(rid) + WHERE s.otype='MaterialSampleRecord' + ) + SELECT r.pid, + string_agg(DISTINCT ic.label, ' ' ORDER BY ic.label) AS concept_labels + FROM all_refs r + JOIN ic ON ic.row_id = r.rid + WHERE ic.label IS NOT NULL AND TRIM(ic.label) != '' + GROUP BY r.pid; + -- one row per MaterialSampleRecord; all concept resolution via JOINs (decorrelated). CREATE OR REPLACE TEMP TABLE samp AS SELECT @@ -108,11 +160,13 @@ def build_base_tables(con, wide, t0): ROUND(ST_X({geom}), 6) AS longitude, mat.material AS material, ctx.uri AS context, - obj.uri AS object_type + obj.uri AS object_type, + cl.concept_labels AS concept_labels FROM read_parquet('{wide}') s LEFT JOIN mat ON mat.pid = s.pid LEFT JOIN ic AS ctx ON ctx.row_id = s.p__has_context_category[1] LEFT JOIN ic AS obj ON obj.row_id = s.p__has_sample_object_type[1] + LEFT JOIN concept_labels cl ON cl.pid = s.pid WHERE s.otype='MaterialSampleRecord'; CREATE OR REPLACE TEMP TABLE samp_geo AS @@ -136,8 +190,19 @@ def build_base_tables(con, wide, t0): def build_sample_facets_v2(con, out): + # description is SEARCH-ONLY in sample_facets_v2: the explorer reads + # description for display from the wide parquet (self-join on pid), never + # from facets_v2. We append the space-joined concept labels of every + # IdentifiedConcept referenced by this sample (p__has_material_category, + # p__has_sample_object_type, p__has_context_category, p__keywords) so that + # full-text searches like "pottery cyprus" match samples tagged with a pottery + # concept even when the word doesn't appear in label/description/place_name. + # The wide's IdentifiedConcept.label is used directly (covers minted keyword + # concepts such as British Museum thesaurus terms that are absent from + # vocab_labels.parquet). See issue #277 part 2. con.execute(f"""COPY ( - SELECT pid, source, material, context, object_type, label, description, + SELECT pid, source, material, context, object_type, label, + {FACETS_DESCRIPTION_EXPR} AS description, place_name::VARCHAR AS place_name FROM samp_geo ORDER BY pid ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") @@ -185,7 +250,7 @@ def build_h3_summary(con, out, res): def build_facet_summaries(con, out): union = " UNION ALL ".join( - f"SELECT '{d}' AS facet_type, {d} AS facet_value FROM samp_geo WHERE {d} IS NOT NULL" + f"SELECT '{d}' AS facet_type, {d} AS facet_value FROM samp_geo WHERE NULLIF(TRIM({d}), '') IS NOT NULL" for d in FACET_DIMS) con.execute(f"""COPY ( SELECT facet_type, facet_value, NULL::INTEGER AS scheme, COUNT(*) AS count @@ -206,7 +271,7 @@ def build_facet_cross_filter(con, out): f"SELECT NULL::VARCHAR AS filter_source, NULL::VARCHAR AS filter_material, " f"NULL::VARCHAR AS filter_context, NULL::VARCHAR AS filter_object_type, " f"'{fd}' AS facet_type, {fd} AS facet_value, COUNT(*) AS count " - f"FROM samp_geo WHERE {fd} IS NOT NULL GROUP BY {fd}") + f"FROM samp_geo WHERE NULLIF(TRIM({fd}), '') IS NOT NULL GROUP BY {fd}") for filt in FACET_DIMS: for fd in FACET_DIMS: cols = ", ".join( @@ -214,7 +279,7 @@ def build_facet_cross_filter(con, out): for c in FACET_DIMS) selects.append( f"SELECT {cols}, '{fd}' AS facet_type, {fd} AS facet_value, COUNT(*) AS count " - f"FROM samp_geo WHERE {filt} IS NOT NULL AND {fd} IS NOT NULL GROUP BY {filt}, {fd}") + f"FROM samp_geo WHERE NULLIF(TRIM({filt}), '') IS NOT NULL AND NULLIF(TRIM({fd}), '') IS NOT NULL GROUP BY {filt}, {fd}") con.execute(f"""COPY ( SELECT filter_source, filter_material, filter_context, filter_object_type, facet_type, facet_value, count diff --git a/scripts/build_vocab_labels.py b/scripts/build_vocab_labels.py index 4bd7c9fe..d003d2f6 100644 --- a/scripts/build_vocab_labels.py +++ b/scripts/build_vocab_labels.py @@ -66,6 +66,27 @@ PREFERRED_LANG = "en" +# Deprecated / legacy concept URIs that are absent from the live SKOS TTLs but +# still appear in older source data (e.g. SESAR records using the specimentype/1.0 +# namespace, superseded by materialsampleobjecttype/1.0). These rows are injected +# directly so the Explorer can display human-readable labels instead of raw URI +# path tails. Each entry: (uri, pref_label, lang, scheme). +# Issue #283b: 169 SESAR records carry these deprecated URIs. +MANUAL_LABEL_OVERRIDES: list[tuple[str, str, str, str | None]] = [ + ( + "https://w3id.org/isample/vocabulary/specimentype/1.0/othersolidobject", + "Other solid object", + "en", + "https://w3id.org/isample/vocabulary/specimentype/1.0/", + ), + ( + "https://w3id.org/isample/vocabulary/specimentype/1.0/physicalspecimen", + "Material sample", + "en", + "https://w3id.org/isample/vocabulary/specimentype/1.0/", + ), +] + # When a concept URI is declared in more than one TTL, prefer the row whose # source TTL's URL contains one of these path fragments. The fragments are # matched against the concept URI: a URI containing "vocabulary/material/" @@ -286,6 +307,23 @@ def main(argv: list[str] | None = None) -> int: print("ERROR: no rows extracted; aborting.", file=sys.stderr) return 2 + # Inject manual overrides for deprecated URIs not present in any live TTL. + # These are appended before dedupe so _dedupe can merge them if they ever + # appear in a future TTL revision, and so _emit_data_form_aliases does NOT + # re-emit them (they already carry the /1.0/ version segment). + for uri, label, lang, scheme in MANUAL_LABEL_OVERRIDES: + all_rows.append({ + "uri": uri, + "uri_form": "data_v1", # already in the /1.0/ data form + "pref_label": label, + "lang": lang, + "scheme": scheme, + "definition": None, + "alt_labels": [], + "source_ttl": "manual_override", + }) + print(f" {len(MANUAL_LABEL_OVERRIDES):>4} rows (manual overrides for deprecated URIs)") + raw_count = len(all_rows) all_rows = _dedupe(all_rows) deduped_collapsed = raw_count - len(all_rows) diff --git a/scripts/ingest_oc_records.py b/scripts/ingest_oc_records.py new file mode 100644 index 00000000..70f92502 --- /dev/null +++ b/scripts/ingest_oc_records.py @@ -0,0 +1,1207 @@ +#!/usr/bin/env python3 +"""TRUE SYNC: ingest new OpenContext records + remove stale OC records. + +Issue #272 Phase 2 (follow-up to PR #275 overlay phase): + The overlay phase (Phase 1) fixed concept mappings for ~1.04M existing OC pids. + This script performs a TRUE SYNC against Eric's 2026-06-09 OC PQG wide: + ADD 67,187 new pids (in Eric's wide but not in src) + REMOVE 21,227 stale pids (in src but not in Eric's wide — Murlo project + mass-updated PIDs; old PIDs would duplicate the same physical samples) + + remove orphaned subgraph entities for the removed pids. + +Decision D3 (2026-06-12, RY): REMOVE the stale pids. Rationale: OpenContext +mass-updated Murlo project PIDs; keeping old pids would duplicate the same +physical samples under two identifiers. This is a TRUE SYNC. + +WHAT IT DOES (single DuckDB session, deterministic): + 1. Identify stale pids (src has, Eric doesn't) → rows to remove + 2. Identify orphan subgraph entities (SE / Geo / Site) only referenced by + removed MSRs — safe to remove; agents are shared so NOT removed. + 3. Identify new pids (Eric has, src doesn't) → rows to add + 4. Extract full entity subgraph for new pids: + MaterialSampleRecord + SamplingEvent + GeospatialCoordLocation + + SamplingSite + Agent + (linked IdentifiedConcepts already in src) + 5. Assign new row_ids: dense rank starting at max(src.row_id)+1, + ordered deterministically by (otype, pid). + 6. Build a mapping table: Eric's row_id → our new row_id. + 7. Remap all p__ arrays on new rows from Eric's id space to our id space. + Concept refs in p__has_* arrays resolved via URI lookup against src's + IdentifiedConcept rows (same pattern as enrich_wide_with_oc_concepts.py). + 8. Denormalize geometry/lat/lon from GeoCoordLoc onto new MSR rows + (builder reads geometry from MSR rows, not from GeoCoordLoc). + 9. Set n='OPENCONTEXT' on new MSR rows (Eric's wide has NULL). + 10. Mint new IdentifiedConcept rows for any concept URIs in new MSRs but + absent from src wide (expected: sampledfeature/1.0/earthsurface). + 11. Hard-fail checks before writing (see HARD FAILURES below). + 12. Write: (src - removed - orphans) UNION ALL new_entities → output wide. + 13. Emit a {out}.manifest.json. + +WHAT IT DOES NOT DO (scope): + - Does not re-run the Phase 1 concept overlay (already in src wide). + - Does not populate p__curation / p__related_resource (OC doesn't have them). + +HARD FAILURES (refuses to write): + - duplicate pids among new MSRs (new pid set must be truly new) + - any new pid already exists in src wide (ingestion grain wrong) + - duplicate row_ids in proposed new id set vs src wide + - any p__ reference in a new row that cannot be resolved to a row_id in output + - any new MSR with n != 'OPENCONTEXT' in the written output + - row count mismatch: output != (src - removed) + new_entities + minted_concepts + - duplicate pids anywhere in output (union would create them if logic is wrong) + - any removed pid still present in output + +Usage: + python scripts/ingest_oc_records.py \\ + --src isamples_202606_wide.parquet \\ + --oc-wide oc_isamples_pqg_wide_2026-06-09.parquet \\ + --out isamples_202608_wide.parquet + + # Dry-run (skips writing, just runs analysis + trust checks): + python scripts/ingest_oc_records.py --src ... --oc-wide ... --out ... --dry-run + +Notes: + - DuckDB pinned to 1.4.4 (scripts/requirements.txt). h3 + spatial extensions + installed at runtime (needed for geometry handling). + - Use the 202606 wide as --src (not 202604). Phase 1 (PR #275) minted the + otheranthropogenicmaterial concept in 202606; using 202604 would require + minting it again and risks id collision with Phase 1. + - The --src wide's row_id column must be BIGINT (our convention). Eric's wide + uses INTEGER — new rows cast to BIGINT automatically. +""" +import argparse +import hashlib +import json +import os +import subprocess +import sys +import time + +import duckdb + +# Dimensions where concept references live on MSR rows +CONCEPT_DIMS = [ + "p__has_material_category", + "p__has_sample_object_type", + "p__has_context_category", +] + +# Columns present in our wide but absent from Eric's wide (will be NULL in new rows) +OUR_ONLY_COLS = ["p__curation", "p__related_resource"] + +# The source attribution for all OC records in the unified wide +OC_SOURCE = "OPENCONTEXT" + + +def sha256_file(path, _bufsize=1 << 20): + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(_bufsize), b""): + h.update(chunk) + return h.hexdigest() + + +def git_sha(): + try: + return subprocess.check_output( + ["git", "rev-parse", "HEAD"], + cwd=os.path.dirname(os.path.abspath(__file__)), + stderr=subprocess.DEVNULL, + ).decode().strip() + except Exception: + return None + + +def log(msg, t0): + print(f"[{time.time()-t0:6.1f}s] {msg}", flush=True) + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--src", required=True, + help="Source unified wide parquet (should be isamples_202606_wide.parquet " + "so Phase-1 concept minting is already present)") + ap.add_argument("--oc-wide", required=True, + help="Eric's OC PQG wide parquet (oc_isamples_pqg_wide_2026-06-09.parquet)") + ap.add_argument("--out", required=True, + help="Output wide parquet (e.g. isamples_202608_wide.parquet)") + ap.add_argument("--dry-run", action="store_true", + help="Run analysis and trust checks, do not write output") + ap.add_argument("--no-manifest", action="store_true") + args = ap.parse_args() + + for fp in (args.src, args.oc_wide): + if not os.path.exists(fp): + sys.exit(f"FATAL: missing input {fp}") + if not args.dry_run: + if os.path.abspath(args.out) in (os.path.abspath(args.src), + os.path.abspath(args.oc_wide)): + sys.exit("FATAL: --out must not overwrite an input") + os.makedirs(os.path.dirname(os.path.abspath(args.out)), exist_ok=True) + + t0 = time.time() + con = duckdb.connect() + con.execute("INSTALL h3 FROM community; LOAD h3; INSTALL spatial; LOAD spatial;") + + SRC = f"read_parquet('{args.src}')" + OC = f"read_parquet('{args.oc_wide}')" + + # ---- schema contract checks ------------------------------------------- + src_cols_raw = con.sql(f"DESCRIBE SELECT * FROM {SRC}").fetchall() + src_cols = [(r[0], r[1]) for r in src_cols_raw] + src_colnames = [c for c, _ in src_cols] + + oc_cols_raw = con.sql(f"DESCRIBE SELECT * FROM {OC}").fetchall() + oc_colnames = [r[0] for r in oc_cols_raw] + + # Verify concept dim columns exist in both + for d in CONCEPT_DIMS: + if d not in src_colnames: + sys.exit(f"FATAL: src wide lacks required column {d}") + if d not in oc_colnames: + sys.exit(f"FATAL: oc-wide lacks required column {d}") + + # Verify p__produced_by exists (coord path) + for col in ("p__produced_by",): + if col not in oc_colnames: + sys.exit(f"FATAL: oc-wide lacks required column {col}") + + log("schema checks passed", t0) + + # ---- grain checks (hard-fail before any writing) ----------------------- + n_dup_src_rowid = con.sql( + f"SELECT COUNT(*) FROM (SELECT row_id FROM {SRC} GROUP BY row_id HAVING COUNT(*)>1)" + ).fetchone()[0] + n_dup_oc_pid_msr = con.sql( + f"SELECT COUNT(*) FROM (SELECT pid FROM {OC} WHERE otype='MaterialSampleRecord' " + f"GROUP BY pid HAVING COUNT(*)>1)" + ).fetchone()[0] + if n_dup_src_rowid or n_dup_oc_pid_msr: + sys.exit( + f"FATAL: non-unique keys — src duplicate row_ids={n_dup_src_rowid}, " + f"OC duplicate MSR pids={n_dup_oc_pid_msr}. Refusing to proceed." + ) + + # ---- Phase D3: identify stale pids to remove --------------------------- + con.execute(f""" + CREATE TEMP TABLE removed_pids AS + SELECT pid + FROM {SRC} WHERE otype='MaterialSampleRecord' AND n='{OC_SOURCE}' + EXCEPT + SELECT pid + FROM {OC} WHERE otype='MaterialSampleRecord'; + """) + n_removed_pids = con.sql("SELECT COUNT(*) FROM removed_pids").fetchone()[0] + log(f"stale pids to remove: {n_removed_pids:,}", t0) + + # ---- Phase D3 orphan analysis: FIXPOINT general orphan removal ---------- + # TRUE GENERAL FORMULATION (no path enumeration): + # + # A candidate row (any non-MSR entity reachable from the 21K removed MSRs' + # subgraph) is removed iff its row_id is NOT referenced by ANY surviving row + # through ANY p__* array column. We iterate to fixpoint because an orphan + # candidate may itself be the sole reference-holder of another candidate + # (e.g. an orphan SamplingSite → orphan Geo). + # + # Algorithm: + # remove_set := row_ids of the stale MSRs + # repeat until remove_set stops growing: + # survivor_refs := DISTINCT union of every p__* array column, UNNESTed, + # over all rows WHERE row_id NOT IN remove_set + # candidates := rows in the removed-MSR subgraph not already in remove_set + # new_orphans := candidates WHERE row_id NOT IN survivor_refs + # remove_set := remove_set UNION new_orphans + # + # The graph is shallow (~3 hops: MSR→SE→Geo/Site→Geo) so fixpoint is reached + # in ≤4 passes. We enumerate the p__* columns from the schema — no guesswork. + + # Collect all p__* columns that carry BIGINT[] or INTEGER[] row_id references. + p_ref_cols = [ + col for col, typ in src_cols + if col.startswith("p__") + and any(t in typ.upper() for t in ("BIGINT", "INTEGER")) + ] + log(f"fixpoint orphan: p__* ref cols = {p_ref_cols}", t0) + + # Build the subgraph of candidates: all non-MSR rows reachable (transitively) + # from the removed MSRs through their p__* arrays. + # We do this in one pass: any row whose row_id appears in ANY p__* array of + # the removed MSRs (or their descendants) is a candidate. + # We use a wide-first BFS: first pass collects direct refs from removed MSRs, + # subsequent passes follow refs from newly discovered candidates. + # For the iSamples graph (depth ≤3) three passes always reach fixpoint. + con.execute(f""" + -- Seed remove_set with the stale MSR row_ids + CREATE TEMP TABLE remove_set AS + SELECT s.row_id + FROM {SRC} s + WHERE s.otype='MaterialSampleRecord' AND s.n='{OC_SOURCE}' + AND s.pid IN (SELECT pid FROM removed_pids); + """) + + pass_num = 0 + while True: + pass_num += 1 + + # -- Step 1: compute survivor_refs: all row_ids referenced by surviving rows + # (rows NOT in remove_set) through ANY p__* reference column. + # Build a UNION ALL of unnest() over each p__* col, filter to surviving rows. + survivor_union = "\n UNION ALL\n ".join( + f"SELECT unnest(w.{col}) AS ref_id" + f" FROM {SRC} w" + f" WHERE w.{col} IS NOT NULL AND len(w.{col}) > 0" + f" AND w.row_id NOT IN (SELECT row_id FROM remove_set)" + for col in p_ref_cols + ) + con.execute(f""" + CREATE OR REPLACE TEMP TABLE survivor_refs_cur AS + SELECT DISTINCT ref_id + FROM ({survivor_union}) t + WHERE ref_id IS NOT NULL + """) + + # -- Step 2: compute the candidate subgraph reachable from remove_set rows. + # Any non-MSR row whose row_id appears in any p__* array of any row in + # remove_set is a candidate for orphan-deletion. + candidate_union = "\n UNION ALL\n ".join( + f"SELECT unnest(r2.{col}) AS row_id" + f" FROM {SRC} r2" + f" WHERE r2.row_id IN (SELECT row_id FROM remove_set)" + f" AND r2.{col} IS NOT NULL" + for col in p_ref_cols + ) + con.execute(f""" + CREATE OR REPLACE TEMP TABLE candidates_cur AS + SELECT DISTINCT row_id + FROM ({candidate_union}) t + WHERE row_id IS NOT NULL + """) + + # -- Step 3: new orphans = candidates NOT in survivor_refs AND NOT already removed + # AND NOT an MSR (we only auto-remove non-MSR entities; MSRs handled explicitly above) + # AND NOT an IdentifiedConcept (vocabulary concept rows are shared across all sources + # and must never be deleted just because one MSR is removed). + new_orphan_ids = con.execute(f""" + SELECT s.row_id + FROM {SRC} s + JOIN candidates_cur c ON c.row_id = s.row_id + WHERE s.otype != 'MaterialSampleRecord' + AND s.otype != 'IdentifiedConcept' + AND s.row_id NOT IN (SELECT row_id FROM remove_set) + AND s.row_id NOT IN (SELECT ref_id FROM survivor_refs_cur) + """).fetchall() + + n_new = len(new_orphan_ids) + log(f"fixpoint pass {pass_num}: {n_new} new orphans", t0) + if n_new == 0: + con.execute("DROP TABLE IF EXISTS survivor_refs_cur") + con.execute("DROP TABLE IF EXISTS candidates_cur") + break + + # Insert the new orphans into remove_set + new_ids_csv = ",".join(str(r[0]) for r in new_orphan_ids) + con.execute(f""" + INSERT INTO remove_set + SELECT DISTINCT row_id FROM {SRC} + WHERE row_id IN ({new_ids_csv}) + AND row_id NOT IN (SELECT row_id FROM remove_set) + """) + + n_rows_to_remove = con.sql("SELECT COUNT(*) FROM remove_set").fetchone()[0] + log(f"fixpoint done in {pass_num} passes: rows_to_remove={n_rows_to_remove:,}", t0) + + # Sanity: verify no non-OC MSR rows crept into remove_set + n_non_oc_in_remove = con.sql(f""" + SELECT COUNT(*) FROM remove_set rs + JOIN {SRC} s ON s.row_id = rs.row_id + WHERE s.otype='MaterialSampleRecord' AND s.n != '{OC_SOURCE}' + """).fetchone()[0] + if n_non_oc_in_remove: + sys.exit(f"FATAL: fixpoint orphan put {n_non_oc_in_remove} non-OC MSR rows in remove_set") + + # For logging: count by otype + otype_counts = con.sql(f""" + SELECT s.otype, COUNT(*) AS n + FROM remove_set rs JOIN {SRC} s ON s.row_id=rs.row_id + GROUP BY s.otype ORDER BY s.otype + """).fetchall() + orphan_counts = {ot: n for ot, n in otype_counts} + n_removed_msrs_actual = orphan_counts.get("MaterialSampleRecord", 0) + if n_removed_msrs_actual != n_removed_pids: + sys.exit(f"FATAL: remove_set has {n_removed_msrs_actual} MSR rows but expected {n_removed_pids}") + log(f"orphan subgraph by otype: {orphan_counts}", t0) + + # Alias rows_to_remove for compatibility with downstream SQL + con.execute("CREATE TEMP TABLE rows_to_remove AS SELECT row_id FROM remove_set") + total_orphan_rows = n_rows_to_remove + + # ---- Phase A: identify new pids ---------------------------------------- + con.execute(f""" + CREATE TEMP TABLE new_pids AS + SELECT pid + FROM {OC} WHERE otype='MaterialSampleRecord' + EXCEPT + SELECT pid + FROM {SRC} WHERE otype='MaterialSampleRecord' AND n='{OC_SOURCE}'; + """) + n_new_pids = con.sql("SELECT COUNT(*) FROM new_pids").fetchone()[0] + log(f"new pids: {n_new_pids:,}", t0) + if n_new_pids == 0: + sys.exit("INFO: no new pids to ingest. Output would be identical to src minus removals. Exiting.") + + # Check none of the new pids sneak in as non-OPENCONTEXT records in src + n_pid_collision = con.sql(f""" + SELECT COUNT(*) FROM new_pids np + JOIN {SRC} s ON s.pid = np.pid AND s.otype='MaterialSampleRecord' + """).fetchone()[0] + if n_pid_collision: + sys.exit(f"FATAL: {n_pid_collision} 'new' pids already exist in src wide (with different n). " + f"This would create duplicate pids in output.") + + # ---- Phase B: extract new MSR rows + full entity subgraph --------------- + log("extracting entity subgraph for new pids...", t0) + con.execute(f""" + -- New MSR rows from Eric's wide + CREATE TEMP TABLE new_msr_eric AS + SELECT e.* + FROM {OC} e + WHERE e.otype='MaterialSampleRecord' AND e.pid IN (SELECT pid FROM new_pids); + + -- Linked SamplingEvent row_ids + CREATE TEMP TABLE se_ids AS + SELECT DISTINCT u.se_id AS eric_row_id + FROM new_msr_eric, UNNEST(p__produced_by) AS u(se_id); + + -- SamplingEvent rows + CREATE TEMP TABLE new_se_eric AS + SELECT e.* FROM {OC} e + WHERE e.otype='SamplingEvent' AND e.row_id IN (SELECT eric_row_id FROM se_ids); + + -- GeospatialCoordLocation ids from SE (p__sample_location) + CREATE TEMP TABLE geo_from_se AS + SELECT DISTINCT u.geo_id AS eric_row_id + FROM new_se_eric, UNNEST(p__sample_location) AS u(geo_id); + + -- SamplingSite ids from SE (p__sampling_site) + CREATE TEMP TABLE site_ids AS + SELECT DISTINCT u.site_id AS eric_row_id + FROM new_se_eric, UNNEST(p__sampling_site) AS u(site_id); + + -- SamplingSite rows + CREATE TEMP TABLE new_site_eric AS + SELECT e.* FROM {OC} e + WHERE e.otype='SamplingSite' AND e.row_id IN (SELECT eric_row_id FROM site_ids); + + -- GeospatialCoordLocation ids from SamplingSite (p__site_location) + CREATE TEMP TABLE geo_from_site AS + SELECT DISTINCT u.loc_id AS eric_row_id + FROM new_site_eric, UNNEST(p__site_location) AS u(loc_id); + + -- All unique GeoCoordLoc ids (union of SE-linked and site-linked) + CREATE TEMP TABLE all_geo_ids AS + SELECT eric_row_id FROM geo_from_se + UNION + SELECT eric_row_id FROM geo_from_site; + + -- GeoCoordLoc rows + CREATE TEMP TABLE new_geo_eric AS + SELECT e.* FROM {OC} e + WHERE e.otype='GeospatialCoordLocation' AND e.row_id IN (SELECT eric_row_id FROM all_geo_ids); + + -- Agent ids from MSR (p__registrant AND p__responsibility — both columns ref Agents) + CREATE TEMP TABLE agent_ids AS + SELECT DISTINCT u.agent_id AS eric_row_id + FROM new_msr_eric, UNNEST(p__registrant) AS u(agent_id) + UNION + SELECT DISTINCT u.agent_id AS eric_row_id + FROM new_msr_eric, UNNEST(p__responsibility) AS u(agent_id); + + -- Agent rows + CREATE TEMP TABLE new_agent_eric AS + SELECT e.* FROM {OC} e + WHERE e.otype='Agent' AND e.row_id IN (SELECT eric_row_id FROM agent_ids); + """) + + counts = { + "new_msr": con.sql("SELECT COUNT(*) FROM new_msr_eric").fetchone()[0], + "new_se": con.sql("SELECT COUNT(*) FROM new_se_eric").fetchone()[0], + "new_geo": con.sql("SELECT COUNT(*) FROM new_geo_eric").fetchone()[0], + "new_site": con.sql("SELECT COUNT(*) FROM new_site_eric").fetchone()[0], + "new_agent": con.sql("SELECT COUNT(*) FROM new_agent_eric").fetchone()[0], + } + log(f"subgraph: msr={counts['new_msr']:,} se={counts['new_se']:,} geo={counts['new_geo']:,} " + f"site={counts['new_site']:,} agent={counts['new_agent']:,}", t0) + + # ---- Phase C: assign new row_ids ---------------------------------------- + max_src_row_id = con.sql(f"SELECT COALESCE(MAX(row_id), 0) FROM {SRC}").fetchone()[0] + log(f"src max_row_id={max_src_row_id:,}", t0) + + # All new entities in one table, ordered deterministically by (otype, pid) + # for stable dense-rank assignment + con.execute(f""" + CREATE TEMP TABLE all_new_entities AS + SELECT row_id AS eric_row_id, pid, otype FROM new_msr_eric + UNION ALL + SELECT row_id, pid, otype FROM new_se_eric + UNION ALL + SELECT row_id, pid, otype FROM new_geo_eric + UNION ALL + SELECT row_id, pid, otype FROM new_site_eric + UNION ALL + SELECT row_id, pid, otype FROM new_agent_eric; + + CREATE TEMP TABLE eric_id_map AS + SELECT eric_row_id, + {max_src_row_id} + DENSE_RANK() OVER (ORDER BY otype, pid) AS our_row_id + FROM all_new_entities; + """) + + n_id_map = con.sql("SELECT COUNT(*) FROM eric_id_map").fetchone()[0] + new_max = con.sql("SELECT MAX(our_row_id) FROM eric_id_map").fetchone()[0] + # Verify no collision with src + n_collision = con.sql(f""" + SELECT COUNT(*) FROM eric_id_map m + WHERE m.our_row_id IN (SELECT row_id FROM {SRC}) + """).fetchone()[0] + if n_collision: + sys.exit(f"FATAL: {n_collision} proposed new row_ids collide with existing src row_ids") + # FIX 2: verify our_row_id uniqueness within eric_id_map. + # DENSE_RANK() over (otype, pid) is unique when all (otype, pid) pairs are distinct. + # If a duplicate (otype, pid) pair sneaked through, two eric_row_ids would share + # the same our_row_id — silently producing colliding row_ids in the output. + n_dup_our_row_id = con.sql(""" + SELECT COUNT(*) FROM ( + SELECT our_row_id FROM eric_id_map + GROUP BY our_row_id HAVING COUNT(*) > 1 + ) + """).fetchone()[0] + if n_dup_our_row_id: + dup_examples = con.sql(""" + SELECT our_row_id, COUNT(*) AS cnt FROM eric_id_map + GROUP BY our_row_id HAVING COUNT(*) > 1 ORDER BY cnt DESC LIMIT 5 + """).fetchall() + sys.exit( + f"FATAL: {n_dup_our_row_id} duplicate our_row_id values in id_map " + f"(duplicate (otype,pid) pairs in new entity set). Examples: {dup_examples}" + ) + log(f"id_map: {n_id_map:,} entries, new row_id range {max_src_row_id+1} to {new_max:,}, " + f"collisions={n_collision}, dup_our_row_ids={n_dup_our_row_id}", t0) + + # ---- Phase D: concept resolution for p__has_* dims --------------------- + # OC concept row_ids (Eric's space) -> URI -> our row_id + # Uses same approach as enrich_wide_with_oc_concepts.py + con.execute(f""" + CREATE TEMP TABLE oc_concept_rows AS + SELECT row_id AS eric_row_id, pid AS uri + FROM {OC} WHERE otype='IdentifiedConcept'; + + CREATE TEMP TABLE src_concept_map AS + SELECT pid AS uri, MIN(row_id) AS our_row_id + FROM {SRC} WHERE otype='IdentifiedConcept' GROUP BY pid; + """) + + # Find concepts referenced by new MSRs that are missing from src. + # Includes p__keywords so keyword IdentifiedConcept rows are minted if absent. + con.execute(f""" + CREATE TEMP TABLE new_concept_refs AS + SELECT DISTINCT u.cid AS eric_cid + FROM new_msr_eric, UNNEST(p__has_material_category) AS u(cid) + UNION + SELECT DISTINCT u.cid FROM new_msr_eric, UNNEST(p__has_sample_object_type) AS u(cid) + UNION + SELECT DISTINCT u.cid FROM new_msr_eric, UNNEST(p__has_context_category) AS u(cid) + UNION + SELECT DISTINCT u.cid FROM new_msr_eric, UNNEST(p__keywords) AS u(cid); + + CREATE TEMP TABLE new_concept_uris AS + SELECT DISTINCT c.uri + FROM new_concept_refs r + JOIN oc_concept_rows c ON c.eric_row_id = r.eric_cid; + """) + + n_unresolved_uris = con.sql(""" + SELECT COUNT(*) FROM new_concept_uris u + LEFT JOIN src_concept_map m ON m.uri = u.uri + WHERE m.our_row_id IS NULL + """).fetchone()[0] + + if n_unresolved_uris: + # These need to be minted — expected: only earthsurface when base is 202606. + missing = con.sql(""" + SELECT u.uri FROM new_concept_uris u + LEFT JOIN src_concept_map m ON m.uri = u.uri + WHERE m.our_row_id IS NULL + ORDER BY u.uri + """).fetchall() + log(f"minting {n_unresolved_uris} new IdentifiedConcept rows: {[r[0] for r in missing]}", t0) + else: + log("all concept URIs already in src", t0) + + # Mint new concept rows + max_src_row_id_with_map = con.sql("SELECT MAX(our_row_id) FROM eric_id_map").fetchone()[0] + con.execute(f""" + CREATE TEMP TABLE new_concepts_to_mint AS + WITH missing_uris AS ( + SELECT u.uri FROM new_concept_uris u + LEFT JOIN src_concept_map m ON m.uri = u.uri + WHERE m.our_row_id IS NULL + ), + meta AS ( + SELECT c.uri, MIN(c2.label) AS label, MIN(c2.scheme_name) AS scheme_name, + MIN(c2.scheme_uri) AS scheme_uri + FROM missing_uris c + JOIN (SELECT pid AS uri, label, scheme_name, scheme_uri FROM {OC} + WHERE otype='IdentifiedConcept') c2 ON c2.uri = c.uri + GROUP BY c.uri + ) + SELECT {max_src_row_id_with_map} + DENSE_RANK() OVER (ORDER BY m.uri) AS our_row_id, + m.uri, m.label, m.scheme_name, m.scheme_uri + FROM meta m; + + -- Complete concept lookup: src existing + newly minted + CREATE TEMP TABLE concept_id_lookup AS + SELECT uri, our_row_id FROM src_concept_map + UNION ALL + SELECT uri, our_row_id FROM new_concepts_to_mint; + """) + + n_minted = con.sql("SELECT COUNT(*) FROM new_concepts_to_mint").fetchone()[0] + log(f"minted_concepts={n_minted}", t0) + + # ---- Phase E: build coord table for new MSRs ---------------------------- + # Eric's wide stores geometry as DuckDB GEOMETRY type (spatial extension auto-decodes). + # Our wide stores geometry as BLOB (WKB bytes). Convert with ST_AsWKB() so the + # UNION ALL with src rows (BLOB) does not fail with BLOB->GEOMETRY cast error. + con.execute(""" + CREATE TEMP TABLE new_msr_coords AS + WITH msr_se AS ( + SELECT m.pid, + se.row_id AS se_eric_row_id, + se.p__sample_location + FROM new_msr_eric m, + UNNEST(m.p__produced_by) AS u(se_rid) + JOIN new_se_eric se ON se.row_id = u.se_rid + ) + SELECT ms.pid, + CASE WHEN geo.geometry IS NOT NULL + THEN ST_AsWKB(geo.geometry)::BLOB + ELSE NULL END AS geometry, + geo.latitude, + geo.longitude + FROM msr_se ms, + UNNEST(ms.p__sample_location) AS u(geo_rid) + JOIN new_geo_eric geo ON geo.row_id = u.geo_rid + WHERE geo.latitude IS NOT NULL; + """) + n_coords = con.sql("SELECT COUNT(*) FROM new_msr_coords").fetchone()[0] + n_dup_coords = con.sql( + "SELECT COUNT(*) FROM (SELECT pid FROM new_msr_coords GROUP BY pid HAVING COUNT(*)>1)" + ).fetchone()[0] + log(f"coords: {n_coords:,} pids with coords, {n_dup_coords} duplicate-coord pids", t0) + if n_dup_coords: + sys.exit(f"FATAL: {n_dup_coords} MSR pids have multiple coord rows in the graph path") + + # ---- Phase F: remap p__ arrays for new entities ------------------------- + # Build remapped MSR rows (concept p__ via URI lookup; structural p__ via eric_id_map) + # Using UNNEST WITH ORDINALITY + JOIN + list() aggregation (decorrelated — no correlated subqueries) + log("remapping p__ arrays for new MSR rows...", t0) + + con.execute(""" + -- Pre-aggregate remapped structural arrays for new MSRs + -- p__produced_by (SamplingEvent references) + CREATE TEMP TABLE remap_msr_pb AS + SELECT m.pid, + list(idm.our_row_id::BIGINT ORDER BY u.ord) AS remapped + FROM new_msr_eric m, + UNNEST(m.p__produced_by) WITH ORDINALITY AS u(eric_rid, ord) + JOIN eric_id_map idm ON idm.eric_row_id = u.eric_rid + GROUP BY m.pid; + + -- p__has_material_category (concept refs via URI lookup) + CREATE TEMP TABLE remap_msr_mat AS + SELECT m.pid, + list(cl.our_row_id::BIGINT ORDER BY u.ord) AS remapped + FROM new_msr_eric m, + UNNEST(m.p__has_material_category) WITH ORDINALITY AS u(eric_rid, ord) + JOIN oc_concept_rows ocr ON ocr.eric_row_id = u.eric_rid + JOIN concept_id_lookup cl ON cl.uri = ocr.uri + GROUP BY m.pid; + + -- p__has_sample_object_type (concept refs) + CREATE TEMP TABLE remap_msr_obj AS + SELECT m.pid, + list(cl.our_row_id::BIGINT ORDER BY u.ord) AS remapped + FROM new_msr_eric m, + UNNEST(m.p__has_sample_object_type) WITH ORDINALITY AS u(eric_rid, ord) + JOIN oc_concept_rows ocr ON ocr.eric_row_id = u.eric_rid + JOIN concept_id_lookup cl ON cl.uri = ocr.uri + GROUP BY m.pid; + + -- p__has_context_category (concept refs) + CREATE TEMP TABLE remap_msr_ctx AS + SELECT m.pid, + list(cl.our_row_id::BIGINT ORDER BY u.ord) AS remapped + FROM new_msr_eric m, + UNNEST(m.p__has_context_category) WITH ORDINALITY AS u(eric_rid, ord) + JOIN oc_concept_rows ocr ON ocr.eric_row_id = u.eric_rid + JOIN concept_id_lookup cl ON cl.uri = ocr.uri + GROUP BY m.pid; + + -- p__registrant (Agent refs) + CREATE TEMP TABLE remap_msr_reg AS + SELECT m.pid, + list(idm.our_row_id::BIGINT ORDER BY u.ord) AS remapped + FROM new_msr_eric m, + UNNEST(m.p__registrant) WITH ORDINALITY AS u(eric_rid, ord) + JOIN eric_id_map idm ON idm.eric_row_id = u.eric_rid + GROUP BY m.pid; + + -- p__keywords (concept refs via URI lookup; same pattern as p__has_material_category) + CREATE TEMP TABLE remap_msr_kw AS + SELECT m.pid, + list(cl.our_row_id::BIGINT ORDER BY u.ord) AS remapped + FROM new_msr_eric m, + UNNEST(m.p__keywords) WITH ORDINALITY AS u(eric_rid, ord) + JOIN oc_concept_rows ocr ON ocr.eric_row_id = u.eric_rid + JOIN concept_id_lookup cl ON cl.uri = ocr.uri + GROUP BY m.pid; + + -- p__responsibility (Agent or other entity refs) + CREATE TEMP TABLE remap_msr_resp AS + SELECT m.pid, + list(idm.our_row_id::BIGINT ORDER BY u.ord) AS remapped + FROM new_msr_eric m, + UNNEST(m.p__responsibility) WITH ORDINALITY AS u(eric_rid, ord) + JOIN eric_id_map idm ON idm.eric_row_id = u.eric_rid + GROUP BY m.pid; + """) + + # Similarly remap SamplingEvent p__ arrays + con.execute(""" + -- SE p__sample_location (GeoCoordLoc refs) + CREATE TEMP TABLE remap_se_sl AS + SELECT s.pid, + list(idm.our_row_id::BIGINT ORDER BY u.ord) AS remapped + FROM new_se_eric s, + UNNEST(s.p__sample_location) WITH ORDINALITY AS u(eric_rid, ord) + JOIN eric_id_map idm ON idm.eric_row_id = u.eric_rid + GROUP BY s.pid; + + -- SE p__sampling_site (SamplingSite refs) + CREATE TEMP TABLE remap_se_ss AS + SELECT s.pid, + list(idm.our_row_id::BIGINT ORDER BY u.ord) AS remapped + FROM new_se_eric s, + UNNEST(s.p__sampling_site) WITH ORDINALITY AS u(eric_rid, ord) + JOIN eric_id_map idm ON idm.eric_row_id = u.eric_rid + GROUP BY s.pid; + + -- SamplingSite p__site_location (GeoCoordLoc refs) + CREATE TEMP TABLE remap_site_sl AS + SELECT s.pid, + list(idm.our_row_id::BIGINT ORDER BY u.ord) AS remapped + FROM new_site_eric s, + UNNEST(s.p__site_location) WITH ORDINALITY AS u(eric_rid, ord) + JOIN eric_id_map idm ON idm.eric_row_id = u.eric_rid + GROUP BY s.pid; + """) + log("p__ remapping tables built", t0) + + # ---- trust checks before writing ---------------------------------------- + log("running pre-write trust checks...", t0) + + # Check all new MSR p__produced_by refs resolve + n_unresolved_se = con.sql(""" + SELECT COUNT(*) FROM new_msr_eric m, UNNEST(m.p__produced_by) AS u(rid) + LEFT JOIN eric_id_map idm ON idm.eric_row_id = u.rid + WHERE idm.our_row_id IS NULL + """).fetchone()[0] + if n_unresolved_se: + sys.exit(f"FATAL: {n_unresolved_se} p__produced_by references in new MSRs do not resolve") + + # Check all concept references resolve (via URI) — includes p__keywords + n_unresolved_concepts = con.sql(""" + WITH all_refs AS ( + SELECT m.pid, u.eric_rid FROM new_msr_eric m, UNNEST(m.p__has_material_category) AS u(eric_rid) + UNION ALL + SELECT m.pid, u.eric_rid FROM new_msr_eric m, UNNEST(m.p__has_sample_object_type) AS u(eric_rid) + UNION ALL + SELECT m.pid, u.eric_rid FROM new_msr_eric m, UNNEST(m.p__has_context_category) AS u(eric_rid) + UNION ALL + SELECT m.pid, u.eric_rid FROM new_msr_eric m, UNNEST(m.p__keywords) AS u(eric_rid) + ) + SELECT COUNT(*) FROM all_refs r + LEFT JOIN oc_concept_rows ocr ON ocr.eric_row_id = r.eric_rid + LEFT JOIN concept_id_lookup cl ON cl.uri = ocr.uri + WHERE cl.our_row_id IS NULL + """).fetchone()[0] + if n_unresolved_concepts: + sys.exit(f"FATAL: {n_unresolved_concepts} concept references (including keywords) in new MSRs do not resolve") + + # Check that rows_to_remove doesn't contain any non-OC rows + n_non_oc_removal = con.sql(f""" + SELECT COUNT(*) FROM rows_to_remove rr + JOIN {SRC} s ON s.row_id = rr.row_id AND s.otype='MaterialSampleRecord' + WHERE s.n != '{OC_SOURCE}' + """).fetchone()[0] + if n_non_oc_removal: + sys.exit(f"FATAL: {n_non_oc_removal} removal targets are non-OC MSR rows (would corrupt other sources)") + + # FIX B — SILENT-DROP GUARD: verify that every p__* source array on new rows + # has a 1:1 remapped array (no silently-dropped refs due to inner-join misses). + # + # The remapping tables (remap_msr_pb, remap_se_sl, remap_se_ss, remap_site_sl) + # use INNER JOINs to eric_id_map. If a source row has a ref not in eric_id_map, + # that row simply DISAPPEARS from the remap table, and the LEFT JOIN in the + # write SQL gives NULL for the column — silently dropping the reference. + # + # For each (source_table, p__col, remap_table) pair, we assert: + # every row with a non-null source array has a matching remap row AND + # the remapped array has the same length as the source array. + # Any mismatch → RuntimeError, build aborted. + def _check_remap_length(source_table, pid_col, src_col, remap_table, remap_col, label): + bad = con.sql(f""" + SELECT s.{pid_col}, len(s.{src_col}) AS src_len, COALESCE(len(r.{remap_col}), 0) AS remap_len + FROM {source_table} s + LEFT JOIN {remap_table} r ON r.{pid_col} = s.{pid_col} + WHERE s.{src_col} IS NOT NULL AND len(s.{src_col}) > 0 + AND COALESCE(len(r.{remap_col}), 0) != len(s.{src_col}) + """).fetchall() + if bad: + details = "; ".join(f"{pid_col}={row[0]} src_len={row[1]} remap_len={row[2]}" for row in bad[:5]) + raise RuntimeError( + f"SILENT-DROP GUARD FAIL [{label}]: {len(bad)} rows have mismatched " + f"source vs remapped array lengths. First offenders: {details}. " + f"Check that all referenced entities were extracted before remapping." + ) + + # MSR structural refs + _check_remap_length("new_msr_eric", "pid", "p__produced_by", "remap_msr_pb", "remapped", "MSR.p__produced_by") + _check_remap_length("new_msr_eric", "pid", "p__registrant", "remap_msr_reg", "remapped", "MSR.p__registrant") + _check_remap_length("new_msr_eric", "pid", "p__responsibility", "remap_msr_resp", "remapped", "MSR.p__responsibility") + # MSR concept refs (via URI lookup — p__has_* dims + p__keywords) + _check_remap_length("new_msr_eric", "pid", "p__has_material_category", "remap_msr_mat", "remapped", "MSR.p__has_material_category") + _check_remap_length("new_msr_eric", "pid", "p__has_sample_object_type", "remap_msr_obj", "remapped", "MSR.p__has_sample_object_type") + _check_remap_length("new_msr_eric", "pid", "p__has_context_category", "remap_msr_ctx", "remapped", "MSR.p__has_context_category") + _check_remap_length("new_msr_eric", "pid", "p__keywords", "remap_msr_kw", "remapped", "MSR.p__keywords") + # SE structural refs + _check_remap_length("new_se_eric", "pid", "p__sample_location", "remap_se_sl", "remapped", "SE.p__sample_location") + _check_remap_length("new_se_eric", "pid", "p__sampling_site", "remap_se_ss", "remapped", "SE.p__sampling_site") + # SamplingSite structural refs + _check_remap_length("new_site_eric", "pid", "p__site_location", "remap_site_sl", "remapped", "Site.p__site_location") + log("silent-drop guard: all structural + concept remapped arrays length-verified (PASS)", t0) + + log("trust checks passed", t0) + + # ---- compute expected output row count ----------------------------------- + n_src = con.sql(f"SELECT COUNT(*) FROM {SRC}").fetchone()[0] + n_new_entities = n_id_map # entities in eric_id_map + n_out_expected = n_src - n_rows_to_remove + n_new_entities + n_minted + log(f"expected output rows: {n_src:,} src - {n_rows_to_remove:,} removed + " + f"{n_new_entities:,} new entities + {n_minted} concepts = {n_out_expected:,}", t0) + + if args.dry_run: + log("DRY RUN: skipping write step", t0) + print("\n=== DRY RUN SUMMARY ===") + print(f" removed_pids: {n_removed_pids:,}") + print(f" orphan_rows: {total_orphan_rows - n_removed_pids:,}") + print(f" total_rows_removed: {n_rows_to_remove:,}") + print(f" new_pids: {n_new_pids:,}") + print(f" new_entities: {n_new_entities:,}") + print(f" minted_concepts: {n_minted}") + print(f" expected_out: {n_out_expected:,}") + print(f" trust_checks: PASS") + return 0 + + # ---- Phase I: write output ----------------------------------------------- + log("writing output...", t0) + + # Build the column list for new MSR rows + # For each column in src_cols, produce an expression that maps Eric's data to our schema + # The key transformations: + # row_id -> from eric_id_map + # n -> 'OPENCONTEXT' + # geometry/latitude/longitude -> from new_msr_coords + # p__produced_by -> from remap_msr_pb + # p__has_material_category -> from remap_msr_mat + # p__has_sample_object_type -> from remap_msr_obj + # p__has_context_category -> from remap_msr_ctx + # p__registrant -> from remap_msr_reg + # p__keywords -> from remap_msr_kw + # p__responsibility -> from remap_msr_resp + # p__curation -> NULL + # p__related_resource -> NULL + # all others -> direct from new_msr_eric + + # New MSR rows SELECT + msr_select_cols = [] + for col, typ in src_cols: + if col == "row_id": + msr_select_cols.append(f"idm.our_row_id::BIGINT AS row_id") + elif col == "n": + msr_select_cols.append(f"'{OC_SOURCE}'::VARCHAR AS n") + elif col == "geometry": + msr_select_cols.append(f"coords.geometry AS geometry") + elif col == "latitude": + msr_select_cols.append(f"coords.latitude AS latitude") + elif col == "longitude": + msr_select_cols.append(f"coords.longitude AS longitude") + elif col == "p__produced_by": + msr_select_cols.append(f"rmap_pb.remapped::{typ} AS p__produced_by") + elif col == "p__has_material_category": + msr_select_cols.append(f"rmap_mat.remapped::{typ} AS p__has_material_category") + elif col == "p__has_sample_object_type": + msr_select_cols.append(f"rmap_obj.remapped::{typ} AS p__has_sample_object_type") + elif col == "p__has_context_category": + msr_select_cols.append(f"rmap_ctx.remapped::{typ} AS p__has_context_category") + elif col == "p__registrant": + msr_select_cols.append(f"rmap_reg.remapped::{typ} AS p__registrant") + elif col == "p__keywords": + msr_select_cols.append(f"rmap_kw.remapped::{typ} AS p__keywords") + elif col == "p__responsibility": + msr_select_cols.append(f"rmap_resp.remapped::{typ} AS p__responsibility") + elif col in OUR_ONLY_COLS: + msr_select_cols.append(f"NULL::{typ} AS {col}") + elif col in oc_colnames: + msr_select_cols.append(f"m.{col}::{typ} AS {col}") + else: + msr_select_cols.append(f"NULL::{typ} AS {col}") + + msr_select = ",\n ".join(msr_select_cols) + + # New SE rows SELECT (remapped p__sample_location and p__sampling_site) + se_select_cols = [] + for col, typ in src_cols: + if col == "row_id": + se_select_cols.append(f"idm.our_row_id::BIGINT AS row_id") + elif col == "p__sample_location": + se_select_cols.append(f"rmap_sl.remapped::{typ} AS p__sample_location") + elif col == "p__sampling_site": + se_select_cols.append(f"rmap_ss.remapped::{typ} AS p__sampling_site") + elif col in OUR_ONLY_COLS: + se_select_cols.append(f"NULL::{typ} AS {col}") + elif col in oc_colnames: + se_select_cols.append(f"s.{col}::{typ} AS {col}") + else: + se_select_cols.append(f"NULL::{typ} AS {col}") + se_select = ",\n ".join(se_select_cols) + + # New SamplingSite rows SELECT (remapped p__site_location) + site_select_cols = [] + for col, typ in src_cols: + if col == "row_id": + site_select_cols.append(f"idm.our_row_id::BIGINT AS row_id") + elif col == "p__site_location": + site_select_cols.append(f"rmap_site_sl.remapped::{typ} AS p__site_location") + elif col in OUR_ONLY_COLS: + site_select_cols.append(f"NULL::{typ} AS {col}") + elif col in oc_colnames: + site_select_cols.append(f"st.{col}::{typ} AS {col}") + else: + site_select_cols.append(f"NULL::{typ} AS {col}") + site_select = ",\n ".join(site_select_cols) + + # Generic entity SELECT (Geo, Agent: just row_id remapped, all other cols direct) + # geometry: Eric's wide has GEOMETRY type (spatial extension), our wide has BLOB (WKB). + # Convert with ST_AsWKB() for GEOMETRY-typed columns; BLOB columns pass through directly. + def generic_entity_select(alias, table_alias, eric_geo_is_geometry=False): + parts = [] + for col, typ in src_cols: + if col == "row_id": + parts.append(f"idm.our_row_id::BIGINT AS row_id") + elif col == "geometry" and eric_geo_is_geometry: + # GeoCoordLoc in Eric's wide stores geometry as GEOMETRY type + parts.append( + f"CASE WHEN {table_alias}.geometry IS NOT NULL " + f"THEN ST_AsWKB({table_alias}.geometry)::BLOB " + f"ELSE NULL END AS geometry" + ) + elif col in OUR_ONLY_COLS: + parts.append(f"NULL::{typ} AS {col}") + elif col in oc_colnames: + parts.append(f"{table_alias}.{col}::{typ} AS {col}") + else: + parts.append(f"NULL::{typ} AS {col}") + return ",\n ".join(parts) + + # GeoCoordLoc in Eric's wide has geometry as GEOMETRY type (auto-decoded by spatial extension) + geo_select = generic_entity_select("g", "g", eric_geo_is_geometry=True) + agent_select = generic_entity_select("a", "a") + + # Minted concept rows SELECT + concept_select_cols = [] + for col, typ in src_cols: + mapping = { + "row_id": f"nc.our_row_id::BIGINT", + "pid": "nc.uri::VARCHAR", + "otype": "'IdentifiedConcept'::VARCHAR", + "label": "nc.label::VARCHAR", + "scheme_name": "nc.scheme_name::VARCHAR", + "scheme_uri": "nc.scheme_uri::VARCHAR", + } + if col in mapping: + concept_select_cols.append(f"{mapping[col]} AS {col}") + else: + concept_select_cols.append(f"NULL::{typ} AS {col}") + concept_select = ",\n ".join(concept_select_cols) + + write_sql = f""" + COPY ( + -- 1. Surviving src rows (all rows NOT in the removal set) + SELECT * FROM {SRC} + WHERE row_id NOT IN (SELECT row_id FROM rows_to_remove) + + UNION ALL BY NAME + + -- 2. New MaterialSampleRecord rows (remapped + denormalized coords) + SELECT {msr_select} + FROM new_msr_eric m + JOIN eric_id_map idm ON idm.eric_row_id = m.row_id + LEFT JOIN new_msr_coords coords ON coords.pid = m.pid + LEFT JOIN remap_msr_pb rmap_pb ON rmap_pb.pid = m.pid + LEFT JOIN remap_msr_mat rmap_mat ON rmap_mat.pid = m.pid + LEFT JOIN remap_msr_obj rmap_obj ON rmap_obj.pid = m.pid + LEFT JOIN remap_msr_ctx rmap_ctx ON rmap_ctx.pid = m.pid + LEFT JOIN remap_msr_reg rmap_reg ON rmap_reg.pid = m.pid + LEFT JOIN remap_msr_kw rmap_kw ON rmap_kw.pid = m.pid + LEFT JOIN remap_msr_resp rmap_resp ON rmap_resp.pid = m.pid + + UNION ALL BY NAME + + -- 3. New SamplingEvent rows (remapped structural arrays) + SELECT {se_select} + FROM new_se_eric s + JOIN eric_id_map idm ON idm.eric_row_id = s.row_id + LEFT JOIN remap_se_sl rmap_sl ON rmap_sl.pid = s.pid + LEFT JOIN remap_se_ss rmap_ss ON rmap_ss.pid = s.pid + + UNION ALL BY NAME + + -- 4. New GeospatialCoordLocation rows (just row_id remapped) + SELECT {geo_select} + FROM new_geo_eric g + JOIN eric_id_map idm ON idm.eric_row_id = g.row_id + + UNION ALL BY NAME + + -- 5. New SamplingSite rows (remapped p__site_location) + SELECT {site_select} + FROM new_site_eric st + JOIN eric_id_map idm ON idm.eric_row_id = st.row_id + LEFT JOIN remap_site_sl rmap_site_sl ON rmap_site_sl.pid = st.pid + + UNION ALL BY NAME + + -- 6. New Agent rows + SELECT {agent_select} + FROM new_agent_eric a + JOIN eric_id_map idm ON idm.eric_row_id = a.row_id + + UNION ALL BY NAME + + -- 7. Minted IdentifiedConcept rows + SELECT {concept_select} + FROM new_concepts_to_mint nc + + ORDER BY row_id + ) TO '{args.out}' (FORMAT PARQUET, COMPRESSION ZSTD) + """ + + con.execute(write_sql) + log(f"wrote {args.out}", t0) + + # ---- post-write verification -------------------------------------------- + OUT = f"read_parquet('{args.out}')" + n_out = con.sql(f"SELECT COUNT(*) FROM {OUT}").fetchone()[0] + if n_out != n_out_expected: + sys.exit(f"FATAL: row count {n_out:,} != expected {n_out_expected:,}. " + f"(src={n_src:,} - removed={n_rows_to_remove:,} + " + f"new={n_new_entities:,} + minted={n_minted})") + + n_dup_out_rowid = con.sql( + f"SELECT COUNT(*) FROM (SELECT row_id FROM {OUT} GROUP BY row_id HAVING COUNT(*)>1)" + ).fetchone()[0] + if n_dup_out_rowid: + sys.exit(f"FATAL: {n_dup_out_rowid} duplicate row_ids in output") + + n_dup_out_pid = con.sql( + f"SELECT COUNT(*) FROM (SELECT pid FROM {OUT} WHERE otype='MaterialSampleRecord' " + f"GROUP BY pid HAVING COUNT(*)>1)" + ).fetchone()[0] + if n_dup_out_pid: + sys.exit(f"FATAL: {n_dup_out_pid} duplicate MaterialSampleRecord pids in output") + + # Verify n='OPENCONTEXT' on ALL new MSR rows in output + n_wrong_n = con.sql(f""" + SELECT COUNT(*) FROM {OUT} + WHERE otype='MaterialSampleRecord' AND n!='{OC_SOURCE}' + AND pid IN (SELECT pid FROM new_pids) + """).fetchone()[0] + if n_wrong_n: + sys.exit(f"FATAL: {n_wrong_n} new MSR rows have n != '{OC_SOURCE}'") + + # Verify NONE of the removed pids remain in output + n_stale_pids_remain = con.sql(f""" + SELECT COUNT(*) FROM {OUT} + WHERE otype='MaterialSampleRecord' AND n='{OC_SOURCE}' + AND pid IN (SELECT pid FROM removed_pids) + """).fetchone()[0] + if n_stale_pids_remain: + sys.exit(f"FATAL: {n_stale_pids_remain} stale (removed) pids remain in output") + + out_oc_count = con.sql( + f"SELECT COUNT(*) FROM {OUT} WHERE otype='MaterialSampleRecord' AND n='{OC_SOURCE}'" + ).fetchone()[0] + log(f"post-write: rows={n_out:,} dup_rowids={n_dup_out_rowid} " + f"dup_pids={n_dup_out_pid} oc_msrs={out_oc_count:,} " + f"stale_remain={n_stale_pids_remain} n_check=PASS", t0) + + # ---- Mandatory in-script dangling-ref gate: ALL rows, ALL p__* columns ---- + # Scan EVERY p__* array column across the ENTIRE output (not just new rows). + # Surviving src rows must also be checked because orphan deletion can create + # dangling refs in old rows (e.g. surviving SamplingSites whose p__site_location + # pointed at geos that were incorrectly deleted as orphans). + # This is a HARD FAIL if any dangling ref is found — build aborted. + log("running mandatory dangling-ref gate on ALL rows, ALL p__* columns...", t0) + p_ref_cols = [ + col for col, typ in src_cols + if col.startswith("p__") + and any(t in typ.upper() for t in ("BIGINT", "INTEGER")) + ] + n_total_dangling = 0 + dangling_details = {} + out_row_ids_subq = f"SELECT row_id FROM {OUT}" + for p_col in p_ref_cols: + # Check ALL rows in output (both surviving src rows and new rows) + n_dangle = con.sql(f""" + WITH all_row_ids AS ({out_row_ids_subq}), + refs AS ( + SELECT unnest(w.{p_col}) AS ref_id + FROM {OUT} w + WHERE w.{p_col} IS NOT NULL AND len(w.{p_col}) > 0 + ) + SELECT COUNT(*) + FROM refs + LEFT JOIN all_row_ids ON refs.ref_id = all_row_ids.row_id + WHERE all_row_ids.row_id IS NULL + """).fetchone()[0] + dangling_details[p_col] = n_dangle + n_total_dangling += n_dangle + for col, cnt in sorted(dangling_details.items()): + print(f" {col}: {cnt} dangling refs", flush=True) + if n_total_dangling: + raise RuntimeError( + f"INTEGRITY FAIL: {n_total_dangling} dangling references in output. " + f"Per-column: {dangling_details}. Build aborted — do NOT emit manifest." + ) + log(f"Dangling ref check: PASS (0 dangling across {len(p_ref_cols)} columns)", t0) + + # ---- Phase J: description enrichment (#277) --------------------------------- + # OC sample descriptions in the combined wide are terse LD metadata strings + # ('updated': 2023-10-05...) instead of the human-readable site-path strings + # ('Open Context published "Sample" from: Europe/Cyprus/PKAP Survey Area/...') + # present in Eric's OC wide. Overwrite `description` for ALL OC MSR pids in + # the output from Eric's wide. This covers both existing ~1.04M pids that + # survived the sync and the 67,187 newly added pids. + # + # Implementation: single DuckDB COPY rewriting only the description column + # for OC MSR rows; all other columns and all non-OC rows pass through as-is. + # Row counts are invariant (JOIN on pid, not a filter). + log("description enrichment (#277): copying OC descriptions from Eric's wide…", t0) + + tmp_enriched = args.out + ".enriching.tmp" + # Use a UNION ALL approach for efficiency: join only OC MSR rows (1.1M) with + # Eric's wide for descriptions, then pass all non-OC rows through unchanged. + # This avoids a full-scan LEFT JOIN on 20M rows (which materializes the full + # wide in memory and is very slow). Both branches use SELECT * REPLACE for + # schema-agnostic column handling. + con.execute(f""" + COPY ( + -- OC MSR rows: overwrite description from Eric's wide where available + SELECT w.* REPLACE ( + CASE WHEN oc.description IS NOT NULL THEN oc.description ELSE w.description END AS description + ) + FROM read_parquet('{args.out}') w + LEFT JOIN ( + SELECT pid, description FROM {OC} WHERE otype='MaterialSampleRecord' + ) oc ON oc.pid = w.pid + WHERE w.otype='MaterialSampleRecord' AND w.n='{OC_SOURCE}' + + UNION ALL BY NAME + + -- All non-OC rows: pass through unchanged (no description modification) + SELECT * FROM read_parquet('{args.out}') + WHERE NOT (otype='MaterialSampleRecord' AND n='{OC_SOURCE}') + + ORDER BY row_id + ) TO '{tmp_enriched}' (FORMAT PARQUET, COMPRESSION ZSTD) + """) + + # Trust gate: verify row count is unchanged, then check Cyprus count + n_enriched = con.sql(f"SELECT COUNT(*) FROM read_parquet('{tmp_enriched}')").fetchone()[0] + if n_enriched != n_out: + os.unlink(tmp_enriched) + sys.exit(f"FATAL: description enrichment changed row count {n_out:,} → {n_enriched:,}") + + n_cyprus = con.sql( + f"SELECT COUNT(*) FROM read_parquet('{tmp_enriched}') " + f"WHERE otype='MaterialSampleRecord' AND n='{OC_SOURCE}' " + f"AND description ILIKE '%Cyprus%'" + ).fetchone()[0] + log(f"description enrichment trust gate: Cyprus OC MSR count = {n_cyprus:,} (expect ≈ 69,230)", t0) + # Hard trust gate: only applies at production scale (out_oc_count > 1M) to avoid + # false-positive failures on small synthetic fixtures that lack Cyprus descriptions. + # Threshold 69,000 is conservative relative to the observed production count of 69,230. + CYPRUS_THRESHOLD = 69000 + if out_oc_count > 1_000_000 and n_cyprus < CYPRUS_THRESHOLD: + os.unlink(tmp_enriched) + raise RuntimeError( + f"Trust gate FAIL: Cyprus description count {n_cyprus:,} < {CYPRUS_THRESHOLD:,} threshold. " + f"Description enrichment may have failed or OC wide is missing Cyprus data." + ) + + # Atomically replace the output with the enriched version + os.replace(tmp_enriched, args.out) + log(f"description enrichment complete: replaced {args.out}", t0) + + # ---- manifest ----------------------------------------------------------- + if not args.no_manifest: + manifest = { + "script": os.path.basename(__file__), + "argv": sys.argv, + "git_sha": git_sha(), + "duckdb_version": duckdb.__version__, + "policy": ("TRUE SYNC: add new OC pids + remove stale OC pids from Eric's fresh OC PQG wide " + "(#272 phase 2, D3 decision 2026-06-12)"), + "inputs": { + "src": {"path": args.src, "bytes": os.path.getsize(args.src), + "sha256": sha256_file(args.src)}, + "oc_wide": {"path": args.oc_wide, "bytes": os.path.getsize(args.oc_wide), + "sha256": sha256_file(args.oc_wide)}, + }, + "counts": { + "src_rows": n_src, + "removed_pids": n_removed_pids, + "orphan_rows": total_orphan_rows - n_removed_pids, + "total_rows_removed": n_rows_to_remove, + "orphan_breakdown": orphan_counts, + "new_pids": n_new_pids, + "new_entity_rows": n_new_entities, + "minted_concepts": n_minted, + "out_rows": n_out, + "new_oc_msr_total": out_oc_count, + "entity_breakdown": counts, + }, + "output": {"path": args.out, "bytes": os.path.getsize(args.out), + "sha256": sha256_file(args.out)}, + } + mpath = args.out + ".manifest.json" + with open(mpath, "w") as fh: + json.dump(manifest, fh, indent=2) + log(f"manifest -> {mpath}", t0) + + log("done", t0) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/validate_frontend_derived.py b/scripts/validate_frontend_derived.py index 9976e261..110ea025 100755 --- a/scripts/validate_frontend_derived.py +++ b/scripts/validate_frontend_derived.py @@ -112,8 +112,9 @@ def scalar(sql): check("facets.pid == map_lite.pid", diff == 0, f"{diff} pids differ between facets and map_lite") # --- 5. ALGEBRA: facet_summaries == GROUP BY facets (per dim) --- + # NOTE: build_frontend_derived.py filters both NULL and empty-string values (#283a fix). recompute = " UNION ALL ".join( - f"SELECT '{d}' AS facet_type, {d} AS facet_value, COUNT(*) AS c FROM {F} WHERE {d} IS NOT NULL GROUP BY {d}" + f"SELECT '{d}' AS facet_type, {d} AS facet_value, COUNT(*) AS c FROM {F} WHERE {d} IS NOT NULL AND {d} <> '' GROUP BY {d}" for d in ("source", "material", "context", "object_type")) mismatch = scalar(f""" WITH recomputed AS ({recompute}), @@ -125,15 +126,19 @@ def scalar(sql): check("facet_summaries == GROUP BY facets", mismatch == 0, f"{mismatch} (facet_type,value,count) rows disagree") check("facet_summaries.scheme all NULL", scalar(f"SELECT COUNT(*) FROM {S} WHERE scheme IS NOT NULL") == 0, "non-NULL scheme rows (contract: scheme is NULL)") + # --- 5b. blank facet values absent (#283a) — also catches whitespace-only values --- + check("facet_summaries no blank values (#283a)", scalar(f"SELECT COUNT(*) FROM {S} WHERE TRIM(facet_value) = ''") == 0, + "blank/whitespace-only facet_value rows (want 0; caused by GEOME empty-string concept URI)") # --- 6. ALGEBRA: facet_cross_filter single-dim rows == conditional GROUP BY facets --- + # NOTE: build_frontend_derived.py filters both NULL and empty-string values (#283a fix). dims = ("source", "material", "context", "object_type") parts = [] for filt in dims: for fd in dims: parts.append( f"SELECT '{filt}' AS fcol, {filt} AS fval, '{fd}' AS facet_type, {fd} AS facet_value, COUNT(*) AS c " - f"FROM {F} WHERE {filt} IS NOT NULL AND {fd} IS NOT NULL GROUP BY {filt}, {fd}") + f"FROM {F} WHERE {filt} IS NOT NULL AND {filt} <> '' AND {fd} IS NOT NULL AND {fd} <> '' GROUP BY {filt}, {fd}") recompute_cf = " UNION ALL ".join(parts) # normalize cross_filter single-dim rows into (fcol, fval, facet_type, facet_value, count) cf_single = f""" @@ -191,8 +196,13 @@ def except_diff(asql, bsql): return scalar(f"SELECT (SELECT COUNT(*) FROM (({asql}) EXCEPT ({bsql}))) " f"+ (SELECT COUNT(*) FROM (({bsql}) EXCEPT ({asql})))") - ref_facets = ("SELECT pid, source, material, context, object_type, label, description, " - "place_name::VARCHAR AS place_name FROM samp_geo") + # description in facets_v2 is SEARCH-ONLY: concept labels are appended by + # the builder (#277 part 2). We import the SAME expression from the builder + # so the validator and builder can never drift from each other. + from build_frontend_derived import FACETS_DESCRIPTION_EXPR + ref_facets = (f"SELECT pid, source, material, context, object_type, label, " + f"{FACETS_DESCRIPTION_EXPR} AS description, " + f"place_name::VARCHAR AS place_name FROM samp_geo") file_facets = f"SELECT pid, source, material, context, object_type, label, description, place_name FROM {F}" check("facets == fresh build from --wide", except_diff(ref_facets, file_facets) == 0, "facets rows differ from a re-derivation off the wide (corruption/stale/wrong-version)") diff --git a/tests/test_frontend_derived.py b/tests/test_frontend_derived.py index b97c1af1..3a7ca68a 100644 --- a/tests/test_frontend_derived.py +++ b/tests/test_frontend_derived.py @@ -68,7 +68,7 @@ def build_fixture_wide(path, geom_mode): f"NULL::VARCHAR AS label, NULL::VARCHAR AS description, NULL::VARCHAR[] AS place_name, " f"NULL::TIMESTAMP AS result_time, NULL AS geometry, " f"NULL::BIGINT[] AS p__has_material_category, NULL::BIGINT[] AS p__has_context_category, " - f"NULL::BIGINT[] AS p__has_sample_object_type" + f"NULL::BIGINT[] AS p__has_sample_object_type, NULL::BIGINT[] AS p__keywords" for rid, uri in CONCEPTS) msr = [] @@ -79,13 +79,13 @@ def build_fixture_wide(path, geom_mode): f"'label {pid}' AS label, 'desc {pid}' AS description, ['plc-{pid}','x''q']::VARCHAR[] AS place_name, " f"NULL::TIMESTAMP AS result_time, {geom(lng, lat)} AS geometry, " f"{_arr(marr)} AS p__has_material_category, [10]::BIGINT[] AS p__has_context_category, " - f"[20]::BIGINT[] AS p__has_sample_object_type") + f"[20]::BIGINT[] AS p__has_sample_object_type, NULL::BIGINT[] AS p__keywords") # one NULL-geometry sample -> must be EXCLUDED from located outputs msr.append( "SELECT 'MaterialSampleRecord' AS otype, 'm-nogeo' AS pid, NULL::BIGINT AS row_id, 'TEST' AS n, " "'l' AS label, 'd' AS description, NULL::VARCHAR[] AS place_name, NULL::TIMESTAMP AS result_time, " "NULL AS geometry, [4]::BIGINT[] AS p__has_material_category, [10]::BIGINT[] AS p__has_context_category, " - "[20]::BIGINT[] AS p__has_sample_object_type") + "[20]::BIGINT[] AS p__has_sample_object_type, NULL::BIGINT[] AS p__keywords") con.execute(f"COPY ({ic_rows} UNION ALL {' UNION ALL '.join(msr)}) " f"TO '{path}' (FORMAT PARQUET)") diff --git a/tests/test_ingest_oc_records.py b/tests/test_ingest_oc_records.py new file mode 100644 index 00000000..f01a5ac7 --- /dev/null +++ b/tests/test_ingest_oc_records.py @@ -0,0 +1,1683 @@ +"""Fast, AI-free fixture tests for the OC record ingestion (#272 Phase 2). + +Builds tiny synthetic src-wide + oc-wide parquet pairs, runs the real +ingest script against them, and asserts the contract: + + TRUE SYNC behavior (D3 decision): + - New pids (Eric's \ src) are ingested with full entity subgraph + - Stale pids (src \ Eric's) are REMOVED along with orphan subgraph entities + - Shared entities (referenced by both surviving and removed MSRs) are kept + - Surviving non-OC rows are byte-identical + + Entity subgraph: + - MaterialSampleRecord + SamplingEvent + GeospatialCoordLocation + SamplingSite + Agent + - row_id remapping: new entities get deterministic ids starting at max(src)+1 + - p__ arrays remapped from Eric's integer space to our BIGINT space + - geometry denormalized from GeoCoordLoc onto MSR rows (WKB BLOB) + - n='OPENCONTEXT' on new MSR rows (Eric's wide has NULL) + + Trust-gate invariants: + - Hard-fail on duplicate OC MSR pids in Eric's wide + - Hard-fail on new pids that already exist in src wide + - Hard-fail on unresolved p__ references in new rows + - Row count arithmetic verified post-write + - No removed pids remain in output + + Determinism: + - Same inputs → bit-identical output (--no-manifest mode) + +Run: pytest tests/test_ingest_oc_records.py -q (needs: duckdb, spatial, h3) +""" +import hashlib +import json +import os +import subprocess +import sys + +import duckdb +import pytest + +HERE = os.path.dirname(os.path.abspath(__file__)) +REPO = os.path.dirname(HERE) +INGEST = os.path.join(REPO, "scripts", "ingest_oc_records.py") + +# Vocabulary URI prefixes for test fixtures +MAT = "https://w3id.org/isample/vocabulary/material/1.0/" +OBJ = "https://w3id.org/isample/vocabulary/materialsampleobjecttype/1.0/" +SF = "https://w3id.org/isample/vocabulary/sampledfeature/1.0/" +ROOT_MAT = MAT + "material" + + +# ---- fixture-building helpers ----------------------------------------------- + +def build_src_wide(path, *, msr_rows, concept_rows, se_rows, geo_rows, + site_rows=None, agent_rows=None, extra_rows=None): + """Build a minimal src wide parquet with the specified entity rows. + + msr_rows: list of dict with keys: row_id, pid, n, p__produced_by (list of ints), + p__has_material_category, p__has_sample_object_type, p__has_context_category + (lists of ints), geometry (WKB BLOB bytes or None), latitude, longitude + concept_rows: list of (row_id, uri) + se_rows: list of (row_id, pid, p__sample_location [list of int], p__sampling_site [list of int]) + geo_rows: list of (row_id, pid, latitude, longitude) — geometry will be ST_AsWKB(ST_Point) + site_rows: list of (row_id, pid, p__site_location [list of int]) + agent_rows: list of (row_id, pid) + """ + con = duckdb.connect() + con.execute("INSTALL spatial; LOAD spatial;") + + def _arr(xs, t="BIGINT[]"): + if xs is None: + return f"NULL::{t}" + return "[" + ",".join(str(x) for x in xs) + f"]::{t}" + + rows = [] + + # Concept rows + for rid, uri in concept_rows: + rows.append( + f"SELECT {rid}::BIGINT AS row_id, '{uri}' AS pid, 'IdentifiedConcept' AS otype, " + f"NULL::VARCHAR AS n, NULL::BLOB AS geometry, NULL::DOUBLE AS latitude, " + f"NULL::DOUBLE AS longitude, NULL::VARCHAR AS label, NULL::VARCHAR AS description, " + f"NULL::VARCHAR[] AS place_name, NULL::TIMESTAMP AS result_time, " + f"NULL::BIGINT[] AS p__has_material_category, NULL::BIGINT[] AS p__has_sample_object_type, " + f"NULL::BIGINT[] AS p__has_context_category, NULL::BIGINT[] AS p__produced_by, " + f"NULL::BIGINT[] AS p__sample_location, NULL::BIGINT[] AS p__sampling_site, " + f"NULL::BIGINT[] AS p__site_location, NULL::BIGINT[] AS p__registrant, " + f"NULL::BIGINT[] AS p__keywords, NULL::BIGINT[] AS p__responsibility, " + f"NULL::INTEGER[] AS p__curation, NULL::BIGINT[] AS p__related_resource, " + f"NULL::VARCHAR AS thumbnail_url, NULL::VARCHAR AS scheme_name, NULL::VARCHAR AS scheme_uri" + ) + + # MSR rows + for m in msr_rows: + lat = m.get("latitude") + lon = m.get("longitude") + if lat is not None and lon is not None: + geom_expr = f"ST_AsWKB(ST_Point({lon}, {lat}))::BLOB" + else: + geom_expr = "NULL::BLOB" + lat_expr = f"{lat}::DOUBLE" if lat is not None else "NULL::DOUBLE" + lon_expr = f"{lon}::DOUBLE" if lon is not None else "NULL::DOUBLE" + pid = m['pid'] + n_val = m.get('n', 'OPENCONTEXT') + rows.append( + f"SELECT {m['row_id']}::BIGINT, '{pid}', 'MaterialSampleRecord', " + f"'{n_val}'::VARCHAR, " + f"{geom_expr}, {lat_expr}, {lon_expr}, " + f"'label {pid}', 'desc {pid}', " + f"['place1']::VARCHAR[], NULL::TIMESTAMP, " + f"{_arr(m.get('p__has_material_category'))}, " + f"{_arr(m.get('p__has_sample_object_type'))}, " + f"{_arr(m.get('p__has_context_category'))}, " + f"{_arr(m.get('p__produced_by'))}, " + f"NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], " + f"{_arr(m.get('p__registrant'))}, " + f"NULL::BIGINT[], NULL::BIGINT[], NULL::INTEGER[], NULL::BIGINT[], " + f"NULL::VARCHAR, NULL::VARCHAR, NULL::VARCHAR" + ) + + # SE rows + for rid, pid, sample_loc, sampling_site in (se_rows or []): + rows.append( + f"SELECT {rid}::BIGINT, '{pid}', 'SamplingEvent', NULL::VARCHAR, " + f"NULL::BLOB, NULL::DOUBLE, NULL::DOUBLE, NULL, NULL, NULL::VARCHAR[], NULL::TIMESTAMP, " + f"NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], " + f"{_arr(sample_loc)}, {_arr(sampling_site)}, NULL::BIGINT[], " + f"NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], NULL::INTEGER[], NULL::BIGINT[], " + f"NULL::VARCHAR, NULL::VARCHAR, NULL::VARCHAR" + ) + + # Geo rows + for rid, pid, lat, lon in (geo_rows or []): + rows.append( + f"SELECT {rid}::BIGINT, '{pid}', 'GeospatialCoordLocation', NULL::VARCHAR, " + f"ST_AsWKB(ST_Point({lon}, {lat}))::BLOB, {lat}::DOUBLE, {lon}::DOUBLE, " + f"NULL, NULL, NULL::VARCHAR[], NULL::TIMESTAMP, " + f"NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], " + f"NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], " + f"NULL::BIGINT[], NULL::BIGINT[], NULL::INTEGER[], NULL::BIGINT[], " + f"NULL::VARCHAR, NULL::VARCHAR, NULL::VARCHAR" + ) + + # SamplingSite rows + for rid, pid, site_loc in (site_rows or []): + rows.append( + f"SELECT {rid}::BIGINT, '{pid}', 'SamplingSite', NULL::VARCHAR, " + f"NULL::BLOB, NULL::DOUBLE, NULL::DOUBLE, NULL, NULL, NULL::VARCHAR[], NULL::TIMESTAMP, " + f"NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], " + f"NULL::BIGINT[], NULL::BIGINT[], {_arr(site_loc)}, " + f"NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], NULL::INTEGER[], NULL::BIGINT[], " + f"NULL::VARCHAR, NULL::VARCHAR, NULL::VARCHAR" + ) + + # Agent rows + for rid, pid in (agent_rows or []): + rows.append( + f"SELECT {rid}::BIGINT, '{pid}', 'Agent', NULL::VARCHAR, " + f"NULL::BLOB, NULL::DOUBLE, NULL::DOUBLE, NULL, NULL, NULL::VARCHAR[], NULL::TIMESTAMP, " + f"NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], " + f"NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], " + f"NULL::BIGINT[], NULL::BIGINT[], NULL::INTEGER[], NULL::BIGINT[], " + f"NULL::VARCHAR, NULL::VARCHAR, NULL::VARCHAR" + ) + + # Extra rows (raw SQL) + if extra_rows: + rows.extend(extra_rows) + + con.execute(f"COPY ({' UNION ALL '.join(rows)}) TO '{path}' (FORMAT PARQUET)") + con.close() + + +def build_oc_wide(path, *, msr_rows, concept_rows, se_rows, geo_rows, + site_rows=None, agent_rows=None): + """Build a minimal OC wide parquet in Eric's schema (INTEGER row_id, GEOMETRY geometry). + + geo_rows: list of (row_id, pid, latitude, longitude) — geometry stored as GEOMETRY type + """ + con = duckdb.connect() + con.execute("INSTALL spatial; LOAD spatial;") + + def _arr(xs, t="INTEGER[]"): + if xs is None: + return f"NULL::{t}" + return "[" + ",".join(str(x) for x in xs) + f"]::{t}" + + rows = [] + + # Concept rows + for rid, uri, label in concept_rows: + rows.append( + f"SELECT {rid}::INTEGER AS row_id, '{uri}' AS pid, 'IdentifiedConcept' AS otype, " + f"NULL::VARCHAR AS n, NULL::GEOMETRY AS geometry, NULL::DOUBLE AS latitude, " + f"NULL::DOUBLE AS longitude, {repr(label)}::VARCHAR AS label, " + f"NULL::VARCHAR AS description, NULL::VARCHAR[] AS place_name, NULL::TIMESTAMP AS result_time, " + f"NULL::INTEGER[] AS p__has_material_category, NULL::INTEGER[] AS p__has_sample_object_type, " + f"NULL::INTEGER[] AS p__has_context_category, NULL::INTEGER[] AS p__produced_by, " + f"NULL::INTEGER[] AS p__sample_location, NULL::INTEGER[] AS p__sampling_site, " + f"NULL::INTEGER[] AS p__site_location, NULL::INTEGER[] AS p__registrant, " + f"NULL::INTEGER[] AS p__keywords, NULL::INTEGER[] AS p__responsibility, " + f"NULL::VARCHAR AS thumbnail_url, NULL::VARCHAR AS scheme_name, NULL::VARCHAR AS scheme_uri" + ) + + # MSR rows (no geometry on MSR in Eric's wide) + for m in msr_rows: + pid = m['pid'] + rows.append( + f"SELECT {m['row_id']}::INTEGER, '{pid}', 'MaterialSampleRecord', " + f"NULL::VARCHAR, " # n is NULL in Eric's wide + f"NULL::GEOMETRY, NULL::DOUBLE, NULL::DOUBLE, " + f"'label {pid}', 'desc {pid}', " + f"['place1']::VARCHAR[], NULL::TIMESTAMP, " + f"{_arr(m.get('p__has_material_category'))}, " + f"{_arr(m.get('p__has_sample_object_type'))}, " + f"{_arr(m.get('p__has_context_category'))}, " + f"{_arr(m.get('p__produced_by'))}, " + f"NULL::INTEGER[], NULL::INTEGER[], NULL::INTEGER[], " + f"{_arr(m.get('p__registrant'))}, " + f"{_arr(m.get('p__keywords'))}, NULL::INTEGER[], " + f"NULL::VARCHAR, NULL::VARCHAR, NULL::VARCHAR" + ) + + # SE rows + for rid, pid, sample_loc, sampling_site in (se_rows or []): + rows.append( + f"SELECT {rid}::INTEGER, '{pid}', 'SamplingEvent', NULL::VARCHAR, " + f"NULL::GEOMETRY, NULL::DOUBLE, NULL::DOUBLE, NULL, NULL, NULL::VARCHAR[], NULL::TIMESTAMP, " + f"NULL::INTEGER[], NULL::INTEGER[], NULL::INTEGER[], NULL::INTEGER[], " + f"{_arr(sample_loc)}, {_arr(sampling_site)}, NULL::INTEGER[], " + f"NULL::INTEGER[], NULL::INTEGER[], NULL::INTEGER[], " + f"NULL::VARCHAR, NULL::VARCHAR, NULL::VARCHAR" + ) + + # Geo rows (GEOMETRY type in Eric's wide) + for rid, pid, lat, lon in (geo_rows or []): + rows.append( + f"SELECT {rid}::INTEGER, '{pid}', 'GeospatialCoordLocation', NULL::VARCHAR, " + f"ST_Point({lon}, {lat})::GEOMETRY, {lat}::DOUBLE, {lon}::DOUBLE, " + f"NULL, NULL, NULL::VARCHAR[], NULL::TIMESTAMP, " + f"NULL::INTEGER[], NULL::INTEGER[], NULL::INTEGER[], NULL::INTEGER[], " + f"NULL::INTEGER[], NULL::INTEGER[], NULL::INTEGER[], NULL::INTEGER[], " + f"NULL::INTEGER[], NULL::INTEGER[], " + f"NULL::VARCHAR, NULL::VARCHAR, NULL::VARCHAR" + ) + + # SamplingSite rows + for rid, pid, site_loc in (site_rows or []): + rows.append( + f"SELECT {rid}::INTEGER, '{pid}', 'SamplingSite', NULL::VARCHAR, " + f"NULL::GEOMETRY, NULL::DOUBLE, NULL::DOUBLE, NULL, NULL, NULL::VARCHAR[], NULL::TIMESTAMP, " + f"NULL::INTEGER[], NULL::INTEGER[], NULL::INTEGER[], NULL::INTEGER[], " + f"NULL::INTEGER[], NULL::INTEGER[], {_arr(site_loc)}, " + f"NULL::INTEGER[], NULL::INTEGER[], NULL::INTEGER[], " + f"NULL::VARCHAR, NULL::VARCHAR, NULL::VARCHAR" + ) + + # Agent rows + for rid, pid in (agent_rows or []): + rows.append( + f"SELECT {rid}::INTEGER, '{pid}', 'Agent', NULL::VARCHAR, " + f"NULL::GEOMETRY, NULL::DOUBLE, NULL::DOUBLE, NULL, NULL, NULL::VARCHAR[], NULL::TIMESTAMP, " + f"NULL::INTEGER[], NULL::INTEGER[], NULL::INTEGER[], NULL::INTEGER[], " + f"NULL::INTEGER[], NULL::INTEGER[], NULL::INTEGER[], NULL::INTEGER[], " + f"NULL::INTEGER[], NULL::INTEGER[], " + f"NULL::VARCHAR, NULL::VARCHAR, NULL::VARCHAR" + ) + + con.execute(f"COPY ({' UNION ALL '.join(rows)}) TO '{path}' (FORMAT PARQUET)") + con.close() + + +def run_ingest(src, oc, out, extra_args=None): + cmd = [sys.executable, INGEST, "--src", src, "--oc-wide", oc, "--out", out, + "--no-manifest"] + if extra_args: + cmd.extend(extra_args) + return subprocess.run(cmd, capture_output=True, text=True) + + +def count_otype(path, otype): + con = duckdb.connect() + n = con.sql(f"SELECT COUNT(*) FROM read_parquet('{path}') WHERE otype='{otype}'").fetchone()[0] + con.close() + return n + + +def get_msr(path, pid): + con = duckdb.connect() + r = con.sql(f"SELECT * FROM read_parquet('{path}') WHERE pid='{pid}' AND otype='MaterialSampleRecord'").fetchone() + desc = con.sql(f"DESCRIBE SELECT * FROM read_parquet('{path}')").fetchall() + cols = [d[0] for d in desc] + con.close() + if r is None: + return None + return dict(zip(cols, r)) + + +# ---- shared fixture --------------------------------------------------------- + +# Concept IDs in src space (BIGINT) +SRC_ROOT_CONCEPT_ID = 1 +SRC_ROCK_CONCEPT_ID = 2 +SRC_ARTIFACT_CONCEPT_ID = 3 + +SRC_CONCEPT_ROWS = [ + (SRC_ROOT_CONCEPT_ID, ROOT_MAT), + (SRC_ROCK_CONCEPT_ID, MAT + "rock"), + (SRC_ARTIFACT_CONCEPT_ID, OBJ + "artifact"), +] + +# OC concept IDs in Eric's space (INTEGER) +OC_ROOT_CONCEPT_ID = 901 +OC_ROCK_CONCEPT_ID = 902 +OC_ARTIFACT_CONCEPT_ID = 903 +OC_EARTH_CONCEPT_ID = 904 # earthsurface — not yet in src + +OC_CONCEPT_ROWS = [ + (OC_ROOT_CONCEPT_ID, ROOT_MAT, "Material"), + (OC_ROCK_CONCEPT_ID, MAT + "rock", "Rock"), + (OC_ARTIFACT_CONCEPT_ID, OBJ + "artifact", "Artifact"), + (OC_EARTH_CONCEPT_ID, SF + "earthsurface", "Earth Surface"), +] + +# Eric's subgraph: 3 SEs, 3 Geos, 2 sites +# New pids MSR: pid-A (se=101->geo=201), pid-B (se=102->geo=202, site=301->geo_site=211) +# Removed pids: pid-C (se=103->geo=203) — in src, not in Eric's → stale +OC_SE_ROWS = [ + (101, "se-pid-A", [201], None), # SE for pid-A + (102, "se-pid-B", [202], [301]), # SE for pid-B (with sampling site) +] +OC_SITE_ROWS = [ + (301, "site-pid-B", [211]), # SamplingSite for pid-B, geo=211 +] +OC_GEO_ROWS = [ + (201, "geo-pid-A", 45.0, 10.0), + (202, "geo-pid-B", 50.0, 15.0), + (211, "geo-site-B", 50.1, 15.1), # geo from SamplingSite for pid-B +] +OC_MSR_ROWS = [ + {"row_id": 1, "pid": "pid-A", "p__produced_by": [101], + "p__has_material_category": [OC_ROCK_CONCEPT_ID], + "p__has_sample_object_type": [OC_ARTIFACT_CONCEPT_ID], + "p__has_context_category": [OC_EARTH_CONCEPT_ID]}, # earthsurface to be minted + {"row_id": 2, "pid": "pid-B", "p__produced_by": [102], + "p__has_material_category": [OC_ROOT_CONCEPT_ID], + "p__has_sample_object_type": [OC_ARTIFACT_CONCEPT_ID], + "p__has_context_category": None}, +] + +# src wide: has pid-C (stale — not in Eric's), + all existing entities +# pid-C: se_id=103, geo_id=203 — both orphans (not shared with any surviving MSR) +SRC_SE_ROWS = [ + # pid-C's SE (will become orphan) + (103, "se-pid-C", [203], None), +] +SRC_GEO_ROWS = [ + (203, "geo-pid-C", 60.0, 20.0), # orphan geo for pid-C +] +SRC_MSR_ROWS = [ + {"row_id": 1000, "pid": "pid-C", "n": "OPENCONTEXT", + "p__produced_by": [103], + "p__has_material_category": [SRC_ROCK_CONCEPT_ID], + "p__has_sample_object_type": [SRC_ARTIFACT_CONCEPT_ID], + "latitude": 60.0, "longitude": 20.0}, + # non-OC MSR — must survive unchanged + {"row_id": 1001, "pid": "pid-NON-OC", "n": "SESAR", + "p__has_material_category": [SRC_ROCK_CONCEPT_ID], + "latitude": 55.0, "longitude": 25.0}, +] +SRC_AGENT_ROWS = [(500, "agent-existing")] # pre-existing agent + + +@pytest.fixture +def pair(tmp_path): + """Canonical 3-MSR fixture: pid-A (new), pid-B (new), pid-C (stale/removed).""" + src = str(tmp_path / "src.parquet") + oc = str(tmp_path / "oc.parquet") + out = str(tmp_path / "out.parquet") + + build_src_wide( + src, + msr_rows=SRC_MSR_ROWS, + concept_rows=SRC_CONCEPT_ROWS, + se_rows=SRC_SE_ROWS, + geo_rows=SRC_GEO_ROWS, + agent_rows=SRC_AGENT_ROWS, + ) + build_oc_wide( + oc, + msr_rows=OC_MSR_ROWS, + concept_rows=OC_CONCEPT_ROWS, + se_rows=OC_SE_ROWS, + geo_rows=OC_GEO_ROWS, + site_rows=OC_SITE_ROWS, + ) + return src, oc, out + + +# ---- tests ------------------------------------------------------------------ + +def test_new_pids_ingested(pair): + """pid-A and pid-B (new) are present in output.""" + src, oc, out = pair + r = run_ingest(src, oc, out) + assert r.returncode == 0, r.stderr + r.stdout + assert get_msr(out, "pid-A") is not None, "pid-A missing from output" + assert get_msr(out, "pid-B") is not None, "pid-B missing from output" + + +def test_stale_pid_removed(pair): + """pid-C (stale) is NOT in output.""" + src, oc, out = pair + assert run_ingest(src, oc, out).returncode == 0 + assert get_msr(out, "pid-C") is None, "pid-C (stale) should have been removed" + + +def test_orphan_subgraph_entities_removed(pair): + """se-pid-C and geo-pid-C (orphans) are NOT in output.""" + src, oc, out = pair + assert run_ingest(src, oc, out).returncode == 0 + con = duckdb.connect() + n_se = con.sql(f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE pid='se-pid-C'").fetchone()[0] + n_geo = con.sql(f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE pid='geo-pid-C'").fetchone()[0] + con.close() + assert n_se == 0, "orphan SE se-pid-C should have been removed" + assert n_geo == 0, "orphan geo geo-pid-C should have been removed" + + +def test_non_oc_rows_survive_unchanged(pair): + """pid-NON-OC (SESAR) is present and byte-identical.""" + src, oc, out = pair + assert run_ingest(src, oc, out).returncode == 0 + r_src = get_msr(src, "pid-NON-OC") + r_out = get_msr(out, "pid-NON-OC") + assert r_out is not None, "non-OC MSR should survive" + assert r_src["pid"] == r_out["pid"] + assert r_src["n"] == r_out["n"] + assert r_src["latitude"] == r_out["latitude"] + assert r_src["longitude"] == r_out["longitude"] + + +def test_geometry_denormalized_onto_new_msr(pair): + """New MSR pid-A gets lat/lon from linked GeoCoordLoc (via SE).""" + src, oc, out = pair + assert run_ingest(src, oc, out).returncode == 0 + r = get_msr(out, "pid-A") + assert r is not None + assert abs(r["latitude"] - 45.0) < 1e-5, f"lat wrong: {r['latitude']}" + assert abs(r["longitude"] - 10.0) < 1e-5, f"lon wrong: {r['longitude']}" + assert r["geometry"] is not None, "geometry should be non-null (WKB BLOB)" + + +def test_n_column_set_on_new_msrs(pair): + """New MSR rows have n='OPENCONTEXT'.""" + src, oc, out = pair + assert run_ingest(src, oc, out).returncode == 0 + assert get_msr(out, "pid-A")["n"] == "OPENCONTEXT" + assert get_msr(out, "pid-B")["n"] == "OPENCONTEXT" + + +def test_p_array_remapped_to_output_id_space(pair): + """p__produced_by on new MSR pid-A resolves to a SE row in the output.""" + src, oc, out = pair + assert run_ingest(src, oc, out).returncode == 0 + con = duckdb.connect() + # Find the SE row_id stored in pid-A's p__produced_by + pb = con.sql(f""" + SELECT p__produced_by[1] FROM read_parquet('{out}') + WHERE pid='pid-A' AND otype='MaterialSampleRecord' + """).fetchone()[0] + # Verify that row_id exists in the output as a SamplingEvent + se_exists = con.sql(f""" + SELECT COUNT(*) FROM read_parquet('{out}') + WHERE row_id = {pb} AND otype='SamplingEvent' + """).fetchone()[0] + con.close() + assert pb is not None, "p__produced_by must be non-null" + assert se_exists == 1, f"SE row_id {pb} not found in output" + + +def test_concept_remap_via_uri_lookup(pair): + """p__has_material_category on new MSR pid-A resolves to the src 'rock' concept row.""" + src, oc, out = pair + assert run_ingest(src, oc, out).returncode == 0 + con = duckdb.connect() + # Get the concept row_id used by pid-A in the output + mat_rid = con.sql(f""" + SELECT p__has_material_category[1] FROM read_parquet('{out}') + WHERE pid='pid-A' AND otype='MaterialSampleRecord' + """).fetchone()[0] + # Verify it resolves to the 'rock' URI + rock_uri = con.sql(f""" + SELECT pid FROM read_parquet('{out}') + WHERE row_id = {mat_rid} AND otype='IdentifiedConcept' + """).fetchone()[0] + con.close() + assert rock_uri == MAT + "rock", f"concept URI wrong: {rock_uri}" + + +def test_minted_concept_earthsurface(pair): + """earthsurface concept is minted when absent from src (referenced by pid-A's context).""" + src, oc, out = pair + assert run_ingest(src, oc, out).returncode == 0 + con = duckdb.connect() + n = con.sql(f""" + SELECT COUNT(*) FROM read_parquet('{out}') + WHERE otype='IdentifiedConcept' AND pid='{SF}earthsurface' + """).fetchone()[0] + con.close() + assert n == 1, "earthsurface concept should be minted in output" + + +def test_sampling_site_ingested_for_pid_b(pair): + """SamplingSite row (site-pid-B) is present in output for pid-B's chain.""" + src, oc, out = pair + assert run_ingest(src, oc, out).returncode == 0 + con = duckdb.connect() + n = con.sql(f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE pid='site-pid-B'").fetchone()[0] + con.close() + assert n == 1, "SamplingSite site-pid-B should be in output" + + +def test_row_count_arithmetic(pair): + """Output row count = (src - removed) + new_entities + minted_concepts.""" + src, oc, out = pair + assert run_ingest(src, oc, out).returncode == 0 + con = duckdb.connect() + n_src = con.sql(f"SELECT COUNT(*) FROM read_parquet('{src}')").fetchone()[0] + n_out = con.sql(f"SELECT COUNT(*) FROM read_parquet('{out}')").fetchone()[0] + # Removed: 1 MSR (pid-C) + 1 SE (se-pid-C) + 1 geo (geo-pid-C) = 3 rows + # New entities: 2 MSR + 2 SE + 3 Geo (201, 202, 211) + 1 Site = 8 rows + # Minted: 1 (earthsurface) + # Expected: n_src - 3 + 8 + 1 + expected = n_src - 3 + 8 + 1 + con.close() + assert n_out == expected, f"row count {n_out} != expected {expected}" + + +def test_no_duplicate_row_ids(pair): + """Output has no duplicate row_ids.""" + src, oc, out = pair + assert run_ingest(src, oc, out).returncode == 0 + con = duckdb.connect() + n_dup = con.sql(f""" + SELECT COUNT(*) FROM ( + SELECT row_id FROM read_parquet('{out}') + GROUP BY row_id HAVING COUNT(*) > 1 + ) + """).fetchone()[0] + con.close() + assert n_dup == 0, f"{n_dup} duplicate row_ids in output" + + +def test_no_duplicate_msr_pids(pair): + """Output has no duplicate MSR pids.""" + src, oc, out = pair + assert run_ingest(src, oc, out).returncode == 0 + con = duckdb.connect() + n_dup = con.sql(f""" + SELECT COUNT(*) FROM ( + SELECT pid FROM read_parquet('{out}') WHERE otype='MaterialSampleRecord' + GROUP BY pid HAVING COUNT(*) > 1 + ) + """).fetchone()[0] + con.close() + assert n_dup == 0, f"{n_dup} duplicate MSR pids in output" + + +def test_determinism_bit_identical(pair, tmp_path): + """Same inputs → bit-identical outputs (--no-manifest suppresses timestamp drift).""" + src, oc, out = pair + out2 = str(tmp_path / "out2.parquet") + assert run_ingest(src, oc, out).returncode == 0 + assert run_ingest(src, oc, out2).returncode == 0 + h = lambda p: hashlib.sha256(open(p, "rb").read()).hexdigest() + assert h(out) == h(out2), "outputs not bit-identical across two runs with same inputs" + + +def test_dry_run_produces_no_output(pair): + """--dry-run exits 0 but does NOT write the output file.""" + src, oc, out = pair + r = run_ingest(src, oc, out, extra_args=["--dry-run"]) + assert r.returncode == 0, r.stderr + r.stdout + assert not os.path.exists(out), "--dry-run should not write output" + assert "DRY RUN" in r.stdout + + +def test_hard_fail_on_duplicate_oc_pids(pair, tmp_path): + """Eric's wide with duplicate MSR pids triggers a hard failure.""" + src, oc, out = pair + dup_oc = str(tmp_path / "oc_dup.parquet") + # Build OC with pid-A appearing twice + build_oc_wide( + dup_oc, + msr_rows=OC_MSR_ROWS + [{"row_id": 99, "pid": "pid-A", + "p__produced_by": [101], + "p__has_material_category": [OC_ROCK_CONCEPT_ID]}], + concept_rows=OC_CONCEPT_ROWS, + se_rows=OC_SE_ROWS, + geo_rows=OC_GEO_ROWS, + ) + r = run_ingest(src, dup_oc, out) + assert r.returncode != 0, "should fail on duplicate OC pids" + assert "duplicate" in (r.stderr + r.stdout).lower() + assert not os.path.exists(out) + + +def test_hard_fail_new_pid_already_in_src(tmp_path): + """A 'new' pid already in src as non-OC row triggers a hard failure.""" + src = str(tmp_path / "src.parquet") + oc = str(tmp_path / "oc.parquet") + out = str(tmp_path / "out.parquet") + + # Build src with pid-A as SESAR (not OC) + pid-C as OC (will be "removed") + build_src_wide( + src, + msr_rows=[ + {"row_id": 1000, "pid": "pid-A", "n": "SESAR", + "p__has_material_category": [SRC_ROCK_CONCEPT_ID], + "latitude": 45.0, "longitude": 10.0}, + {"row_id": 1001, "pid": "pid-C", "n": "OPENCONTEXT", + "p__produced_by": [103], + "p__has_material_category": [SRC_ROCK_CONCEPT_ID]}, + ], + concept_rows=SRC_CONCEPT_ROWS, + se_rows=[(103, "se-pid-C", [203], None)], + geo_rows=[(203, "geo-pid-C", 60.0, 20.0)], + ) + build_oc_wide( + oc, + msr_rows=OC_MSR_ROWS, # pid-A is "new" from OC's perspective + concept_rows=OC_CONCEPT_ROWS, + se_rows=OC_SE_ROWS, + geo_rows=OC_GEO_ROWS, + ) + r = run_ingest(src, oc, out) + assert r.returncode != 0, "should fail — pid-A is 'new' from OC but already in src as SESAR" + assert not os.path.exists(out) + + +def test_removal_only_removes_oc_entities(pair): + """The non-OC MSR (pid-NON-OC) and its row are not in the removal set.""" + src, oc, out = pair + assert run_ingest(src, oc, out).returncode == 0 + # non-OC row must still be in output + r = get_msr(out, "pid-NON-OC") + assert r is not None, "non-OC MSR should not be removed" + assert r["n"] == "SESAR" + + +def test_new_row_ids_no_collision_with_src(pair): + """All new row_ids are strictly greater than max(src.row_id).""" + src, oc, out = pair + assert run_ingest(src, oc, out).returncode == 0 + con = duckdb.connect() + max_src = con.sql(f"SELECT MAX(row_id) FROM read_parquet('{src}')").fetchone()[0] + # New rows start at max_src+1 — get all rows NOT in src + src_ids = set(r[0] for r in con.sql(f"SELECT row_id FROM read_parquet('{src}')").fetchall()) + out_ids = set(r[0] for r in con.sql(f"SELECT row_id FROM read_parquet('{out}')").fetchall()) + new_ids = out_ids - src_ids + con.close() + # Removed rows are also gone; new ids are all > max_src + if new_ids: + assert min(new_ids) > max_src, f"New ids start at {min(new_ids)}, but max_src={max_src}" + + +def test_refuses_to_overwrite_input(pair): + """--out same as --src triggers a hard failure.""" + src, oc, _ = pair + r = run_ingest(src, oc, src) + assert r.returncode != 0 + assert "overwrite" in (r.stderr + r.stdout).lower() + + +# ============================================================================ +# Fix #277 — OC description enrichment +# ============================================================================ + +def test_oc_description_enriched_from_eric_wide(pair): + """OC MSR pid-A gets its description from Eric's OC wide after ingestion. + + The src wide stores 'desc pid-A' (a placeholder). Eric's wide also stores + 'desc pid-A' by default from build_oc_wide(). We override pid-A's description + in Eric's wide to a realistic site-path string and verify the output carries + that enriched value, not the src placeholder. + """ + src, oc, out = pair + + # Patch Eric's wide to have a realistic description for pid-A. + # We rebuild oc with a custom description for pid-A. + oc_patched = out.replace("out.parquet", "oc_patched.parquet") + con = duckdb.connect() + con.execute("INSTALL spatial; LOAD spatial;") + # Read Eric's wide into a temp table, update pid-A's description, rewrite. + con.execute(f""" + COPY ( + SELECT + row_id, pid, otype, n, geometry, latitude, longitude, + CASE WHEN pid='pid-A' AND otype='MaterialSampleRecord' + THEN 'Open Context published "Sample" from: Europe/Cyprus/PKAP Survey Area/Unit 42' + ELSE label + END AS label, + CASE WHEN pid='pid-A' AND otype='MaterialSampleRecord' + THEN 'Open Context published "Sample" from: Europe/Cyprus/PKAP Survey Area/Unit 42' + ELSE description + END AS description, + place_name, result_time, p__has_material_category, p__has_sample_object_type, + p__has_context_category, p__produced_by, p__sample_location, p__sampling_site, + p__site_location, p__registrant, p__keywords, p__responsibility, + thumbnail_url, scheme_name, scheme_uri + FROM read_parquet('{oc}') + ) TO '{oc_patched}' (FORMAT PARQUET) + """) + con.close() + + r = run_ingest(src, oc_patched, out) + assert r.returncode == 0, r.stderr + r.stdout + + row = get_msr(out, "pid-A") + assert row is not None + assert "Cyprus" in row["description"], ( + f"Expected enriched description with 'Cyprus', got: {row['description']!r}" + ) + + +def test_non_oc_description_unchanged_by_enrichment(pair): + """Non-OC MSR (pid-NON-OC) description is not overwritten by the OC enrichment.""" + src, oc, out = pair + r = run_ingest(src, oc, out) + assert r.returncode == 0, r.stderr + r.stdout + + src_row = get_msr(src, "pid-NON-OC") + out_row = get_msr(out, "pid-NON-OC") + assert out_row is not None + # Non-OC rows must have same description as in src (enrichment must not touch them) + assert out_row["description"] == src_row["description"], ( + f"Non-OC description changed: {src_row['description']!r} → {out_row['description']!r}" + ) + + +def test_oc_msr_count_unchanged_by_enrichment(pair): + """Description enrichment does not change the OC MSR row count.""" + src, oc, out = pair + r = run_ingest(src, oc, out) + assert r.returncode == 0, r.stderr + r.stdout + + con = duckdb.connect() + n_total = con.sql(f"SELECT COUNT(*) FROM read_parquet('{out}')").fetchone()[0] + n_oc_msr = con.sql(f""" + SELECT COUNT(*) FROM read_parquet('{out}') + WHERE otype='MaterialSampleRecord' AND n='OPENCONTEXT' + """).fetchone()[0] + con.close() + # 2 new OC MSRs (pid-A, pid-B), 1 removed (pid-C), 1 non-OC (pid-NON-OC) → 2 total OC MSRs + assert n_oc_msr == 2, f"Expected 2 OC MSRs after sync, got {n_oc_msr}" + # Row count must match the sync arithmetic (n_src - 3 removed + 8 new + 1 minted) + con2 = duckdb.connect() + n_src = con2.sql(f"SELECT COUNT(*) FROM read_parquet('{src}')").fetchone()[0] + con2.close() + assert n_total == n_src - 3 + 8 + 1, f"Total row count unexpected: {n_total}" + + +# ============================================================================ +# Fix #283a — Empty-string facet filter +# ============================================================================ + +def test_empty_string_facet_values_filtered_from_summaries(tmp_path): + """build_facet_summaries must not produce rows with facet_value=''. + + This is a synthetic test: we build a tiny samp_geo with an empty-string + context value and verify it does NOT appear in facet_summaries output. + """ + import duckdb as _duckdb + BUILD = os.path.join(REPO, "scripts", "build_frontend_derived.py") + sys.path.insert(0, os.path.join(REPO, "scripts")) + import build_frontend_derived as B + + con = _duckdb.connect() + con.execute("INSTALL h3 FROM community; LOAD h3; INSTALL spatial; LOAD spatial;") + # Create a synthetic samp_geo with an empty-string context and a real one + con.execute(""" + CREATE OR REPLACE TEMP TABLE samp_geo AS + SELECT 'pid1' AS pid, 'GEOME' AS source, + 'https://w3id.org/isample/vocabulary/material/1.0/rock' AS material, + '' AS context, -- empty-string concept URI (the bug scenario) + 'https://w3id.org/isample/vocabulary/materialsampleobjecttype/1.0/artifact' AS object_type, + 'label1' AS label, 'desc1' AS description, + NULL::VARCHAR AS place_name, NULL::TIMESTAMP AS result_time, + 10.0::DOUBLE AS latitude, 45.0::DOUBLE AS longitude, + 1::UBIGINT AS h3_res4, 2::UBIGINT AS h3_res6, 3::UBIGINT AS h3_res8 + UNION ALL + SELECT 'pid2', 'GEOME', + 'https://w3id.org/isample/vocabulary/material/1.0/rock', + 'https://w3id.org/isample/vocabulary/sampledfeature/1.0/earthsurface', + 'https://w3id.org/isample/vocabulary/materialsampleobjecttype/1.0/artifact', + 'label2', 'desc2', NULL, NULL, 11.0, 46.0, 1, 2, 3 + """) + + out = str(tmp_path / "facet_summaries.parquet") + B.build_facet_summaries(con, out) + + rows = con.sql(f"SELECT * FROM read_parquet('{out}') WHERE facet_value = ''").fetchall() + assert rows == [], ( + f"Expected no blank facet_value rows, but got: {rows}" + ) + # Real context value should appear + real_rows = con.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE facet_type='context' AND facet_value != ''" + ).fetchone()[0] + assert real_rows >= 1, "Expected at least one non-blank context facet row" + + +def test_empty_string_facet_values_filtered_from_cross_filter(tmp_path): + """build_facet_cross_filter must not produce rows with blank facet_value.""" + import duckdb as _duckdb + sys.path.insert(0, os.path.join(REPO, "scripts")) + import build_frontend_derived as B + + con = _duckdb.connect() + con.execute("INSTALL h3 FROM community; LOAD h3; INSTALL spatial; LOAD spatial;") + con.execute(""" + CREATE OR REPLACE TEMP TABLE samp_geo AS + SELECT 'pid1' AS pid, 'GEOME' AS source, + 'https://w3id.org/isample/vocabulary/material/1.0/rock' AS material, + '' AS context, + 'https://w3id.org/isample/vocabulary/materialsampleobjecttype/1.0/artifact' AS object_type, + 'label1' AS label, 'desc1' AS description, + NULL::VARCHAR AS place_name, NULL::TIMESTAMP AS result_time, + 10.0::DOUBLE AS latitude, 45.0::DOUBLE AS longitude, + 1::UBIGINT AS h3_res4, 2::UBIGINT AS h3_res6, 3::UBIGINT AS h3_res8 + """) + + out = str(tmp_path / "facet_cross_filter.parquet") + B.build_facet_cross_filter(con, out) + + blank_rows = con.sql(f"SELECT * FROM read_parquet('{out}') WHERE facet_value = ''").fetchall() + assert blank_rows == [], ( + f"Expected no blank facet_value in cross_filter, got: {blank_rows}" + ) + blank_filter_rows = con.sql( + f"SELECT * FROM read_parquet('{out}') WHERE filter_context = ''" + ).fetchall() + assert blank_filter_rows == [], ( + f"Expected no blank filter_context in cross_filter, got: {blank_filter_rows}" + ) + + +# ============================================================================ +# Fix #283b — specimentype/1.0 vocab labels +# ============================================================================ + +SPEC_URI_SOLID = "https://w3id.org/isample/vocabulary/specimentype/1.0/othersolidobject" +SPEC_URI_PHYS = "https://w3id.org/isample/vocabulary/specimentype/1.0/physicalspecimen" + +# Optional fast-path: if ISAMPLES_VOCAB_LABELS points at an already-built +# vocab_labels.parquet, reuse it; otherwise (CI / fresh checkout) we rebuild +# it on the fly. No machine-specific default — avoids leaking a local path. +VOCAB_LABELS_PATH = os.environ.get("ISAMPLES_VOCAB_LABELS", "") + + +def _get_vocab_labels_parquet(): + """Return a path to vocab_labels.parquet, building it if needed.""" + if VOCAB_LABELS_PATH and os.path.exists(VOCAB_LABELS_PATH): + return VOCAB_LABELS_PATH + # Build into a temp file for CI / offline environments. + BUILD_VL = os.path.join(REPO, "scripts", "build_vocab_labels.py") + import tempfile + tmp = tempfile.mktemp(suffix=".parquet") + result = subprocess.run( + [sys.executable, BUILD_VL, "-o", tmp], + capture_output=True, text=True + ) + if result.returncode != 0: + pytest.skip(f"build_vocab_labels.py failed (network?): {result.stderr[:200]}") + return tmp + + +def test_specimentype_othersolidobject_in_vocab_labels(): + """specimentype/1.0/othersolidobject must be present with label 'Other solid object'.""" + vl = _get_vocab_labels_parquet() + con = duckdb.connect() + row = con.sql( + f"SELECT pref_label FROM read_parquet('{vl}') WHERE uri='{SPEC_URI_SOLID}'" + ).fetchone() + con.close() + assert row is not None, f"{SPEC_URI_SOLID!r} not found in vocab_labels" + assert row[0] == "Other solid object", f"Expected 'Other solid object', got {row[0]!r}" + + +def test_specimentype_physicalspecimen_in_vocab_labels(): + """specimentype/1.0/physicalspecimen must be present with label 'Material sample'.""" + vl = _get_vocab_labels_parquet() + con = duckdb.connect() + row = con.sql( + f"SELECT pref_label FROM read_parquet('{vl}') WHERE uri='{SPEC_URI_PHYS}'" + ).fetchone() + con.close() + assert row is not None, f"{SPEC_URI_PHYS!r} not found in vocab_labels" + assert row[0] == "Material sample", f"Expected 'Material sample', got {row[0]!r}" + + +def test_specimentype_labels_have_lang_en(): + """Both specimentype manual overrides must have lang='en'.""" + vl = _get_vocab_labels_parquet() + con = duckdb.connect() + rows = con.sql( + f"SELECT uri, lang FROM read_parquet('{vl}') WHERE uri LIKE '%specimentype%'" + ).fetchall() + con.close() + assert len(rows) == 2, f"Expected 2 specimentype rows, got {len(rows)}: {rows}" + for uri, lang in rows: + assert lang == "en", f"Expected lang='en' for {uri!r}, got {lang!r}" + + +# ============================================================================ +# Blocker 1 — cross-source orphan protection (Nit C + Nit D) +# ============================================================================ + +def test_cross_source_shared_entity_not_orphaned(tmp_path): + """SE and SamplingSite shared between a removed OC MSR and a surviving SESAR MSR + must NOT be deleted as orphans. + + Scenario: + src: + - OC MSR pid='OC_remove_me' → SE row_id=100 → SamplingSite row_id=200 + - SE row_id=100 + - SamplingSite row_id=200 + - SESAR MSR pid='SESAR_keep_me' ALSO → SE row_id=100 + SamplingSite row_id=200 + Eric's OC wide: + - pid='NEW_OC_pid' (new OC record, NOT 'OC_remove_me' → it's gone) + + After sync: + - 'OC_remove_me' is removed (not in Eric's wide) + - SE row_id=100 is STILL referenced by 'SESAR_keep_me' → must survive + - SamplingSite row_id=200 is STILL referenced by 'SESAR_keep_me' → must survive + + Old code (surviving_se_refs filtered to n='OPENCONTEXT') would incorrectly + mark SE 100 and Site 200 as orphans and delete them, breaking the SESAR MSR. + New code (all-source surviving refs) must keep them. + """ + src = str(tmp_path / "src_b1.parquet") + oc = str(tmp_path / "oc_b1.parquet") + out = str(tmp_path / "out_b1.parquet") + + # row_ids in src space + SE_ROW_ID = 100 + SITE_ROW_ID = 200 + GEO_ROW_ID = 300 + + # Build src wide: OC MSR to-be-removed + SESAR MSR sharing SE+Site + build_src_wide( + src, + msr_rows=[ + # OC MSR that will be removed (not in Eric's wide) + { + "row_id": 1000, "pid": "OC_remove_me", "n": "OPENCONTEXT", + "p__produced_by": [SE_ROW_ID], + "p__has_material_category": [SRC_ROCK_CONCEPT_ID], + "latitude": 45.0, "longitude": 10.0, + }, + # SESAR MSR that ALSO references SE 100 + Site 200 (shared!) + { + "row_id": 1001, "pid": "SESAR_keep_me", "n": "SESAR", + "p__produced_by": [SE_ROW_ID], + "p__has_material_category": [SRC_ROCK_CONCEPT_ID], + "latitude": 45.1, "longitude": 10.1, + }, + ], + concept_rows=SRC_CONCEPT_ROWS, + se_rows=[ + # SE shared between OC + SESAR MSRs + (SE_ROW_ID, "se-shared", [GEO_ROW_ID], [SITE_ROW_ID]), + ], + geo_rows=[ + (GEO_ROW_ID, "geo-shared", 45.0, 10.0), + ], + site_rows=[ + (SITE_ROW_ID, "site-shared", [GEO_ROW_ID]), + ], + ) + + # Build Eric's OC wide: NEW_OC_pid (new), NOT OC_remove_me → it becomes stale + # Use a simple SE + geo for the new OC record + build_oc_wide( + oc, + msr_rows=[ + { + "row_id": 1, "pid": "NEW_OC_pid", + "p__produced_by": [501], + "p__has_material_category": [OC_ROCK_CONCEPT_ID], + }, + ], + concept_rows=OC_CONCEPT_ROWS, + se_rows=[(501, "se-new", [601], None)], + geo_rows=[(601, "geo-new", 46.0, 11.0)], + ) + + r = run_ingest(src, oc, out) + assert r.returncode == 0, f"ingest failed:\nSTDERR: {r.stderr}\nSTDOUT: {r.stdout}" + + con = duckdb.connect() + + # SESAR MSR must survive + sesar_row = con.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE pid='SESAR_keep_me' AND otype='MaterialSampleRecord'" + ).fetchone()[0] + assert sesar_row == 1, "SESAR_keep_me MSR should survive — it was not an OC record" + + # OC MSR must be gone + oc_row = con.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE pid='OC_remove_me' AND otype='MaterialSampleRecord'" + ).fetchone()[0] + assert oc_row == 0, "OC_remove_me MSR should have been removed" + + # Shared SE must survive (still referenced by SESAR_keep_me) + se_count = con.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE pid='se-shared' AND otype='SamplingEvent'" + ).fetchone()[0] + assert se_count == 1, ( + "se-shared (SE row_id=100) must NOT be orphaned — still referenced by SESAR_keep_me" + ) + + # Shared SamplingSite must survive + site_count = con.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE pid='site-shared' AND otype='SamplingSite'" + ).fetchone()[0] + assert site_count == 1, ( + "site-shared (SamplingSite row_id=200) must NOT be orphaned — still referenced by SESAR_keep_me's SE" + ) + + # Shared Geo must survive + geo_count = con.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE pid='geo-shared' AND otype='GeospatialCoordLocation'" + ).fetchone()[0] + assert geo_count == 1, ( + "geo-shared (Geo row_id=300) must NOT be orphaned — still referenced by the shared SE" + ) + + # SESAR MSR's p__produced_by must still resolve to a valid SE + pb = con.sql(f""" + SELECT p__produced_by[1] FROM read_parquet('{out}') + WHERE pid='SESAR_keep_me' AND otype='MaterialSampleRecord' + """).fetchone()[0] + if pb is not None: + se_exists = con.sql(f""" + SELECT COUNT(*) FROM read_parquet('{out}') + WHERE row_id = {pb} AND otype='SamplingEvent' + """).fetchone()[0] + assert se_exists == 1, f"SESAR_keep_me p__produced_by row_id={pb} not found in output (dangling ref!)" + + con.close() + + +# ============================================================================ +# Fix #272 Phase 5 — surviving SamplingSite's p__site_location Geo not orphaned +# ============================================================================ + +def test_site_location_geo_not_orphaned(tmp_path): + """A GeospatialCoordLocation referenced by a surviving SamplingSite via + p__site_location must NOT be deleted as an orphan even if it is also + referenced by an orphan SamplingEvent via p__sample_location. + + Scenario: + src: + - OC MSR pid='OC_removed', n='OPENCONTEXT' + → p__produced_by=[10] (SE row_id=10) + - SE row_id=10, p__sample_location=[20], p__sampling_site=[30] + - Geo row_id=20 + - SamplingSite row_id=30, p__site_location=[20] ← same Geo! + - SESAR MSR pid='SESAR_kept', n='SESAR' + → p__produced_by=[40] (SE row_id=40) + - SE row_id=40, p__sampling_site=[30] ← references surviving Site + + Eric's OC wide: does NOT contain 'OC_removed' → it becomes stale + - Contains 'NEW_OC_pid' (new) + + After sync: + - 'OC_removed' MSR is removed + - SE row_id=10 is orphaned (only referenced by removed OC MSR's p__produced_by) + - SamplingSite row_id=30 SURVIVES (referenced by SESAR SE row_id=40) + - GeospatialCoordLocation row_id=20 SURVIVES (referenced by surviving Site + row_id=30 via p__site_location) — this is the critical assertion + - Zero dangling p__site_location refs in output + + OLD CODE (BUG): surviving_geo_refs only checked p__sample_location on SEs. + Since SE row_id=10 is orphaned, Geo row_id=20 appeared to have NO surviving + SE references → incorrectly deleted → dangling p__site_location on Site row_id=30. + + NEW CODE (FIX): surviving_geo_refs also checks p__site_location on non-orphan + SamplingSites → Geo row_id=20 is retained. + """ + src = str(tmp_path / "src_sl.parquet") + oc = str(tmp_path / "oc_sl.parquet") + out = str(tmp_path / "out_sl.parquet") + + GEO_ROW_ID = 20 + SE_OC_ROW_ID = 10 + SITE_ROW_ID = 30 + SE_SESAR_ROW_ID = 40 + + build_src_wide( + src, + msr_rows=[ + # OC MSR that will be removed (not in Eric's wide) + { + "row_id": 1000, "pid": "OC_removed", "n": "OPENCONTEXT", + "p__produced_by": [SE_OC_ROW_ID], + "p__has_material_category": [SRC_ROCK_CONCEPT_ID], + "latitude": 45.0, "longitude": 10.0, + }, + # SESAR MSR whose SE references the surviving SamplingSite + { + "row_id": 1001, "pid": "SESAR_kept", "n": "SESAR", + "p__produced_by": [SE_SESAR_ROW_ID], + "p__has_material_category": [SRC_ROCK_CONCEPT_ID], + "latitude": 45.1, "longitude": 10.1, + }, + ], + concept_rows=SRC_CONCEPT_ROWS, + se_rows=[ + # OC's SE: sample_location → Geo 20, sampling_site → Site 30 + (SE_OC_ROW_ID, "se-oc", [GEO_ROW_ID], [SITE_ROW_ID]), + # SESAR's SE: sampling_site → Site 30 (keeps Site alive) + (SE_SESAR_ROW_ID, "se-sesar", None, [SITE_ROW_ID]), + ], + geo_rows=[ + (GEO_ROW_ID, "geo-shared", 45.0, 10.0), + ], + site_rows=[ + # SamplingSite references Geo via p__site_location + (SITE_ROW_ID, "site-shared", [GEO_ROW_ID]), + ], + ) + + # Eric's OC wide: NEW_OC_pid only (OC_removed is absent → stale) + build_oc_wide( + oc, + msr_rows=[ + { + "row_id": 1, "pid": "NEW_OC_pid", + "p__produced_by": [501], + "p__has_material_category": [OC_ROCK_CONCEPT_ID], + }, + ], + concept_rows=OC_CONCEPT_ROWS, + se_rows=[(501, "se-new-oc", [601], None)], + geo_rows=[(601, "geo-new-oc", 46.0, 11.0)], + ) + + r = run_ingest(src, oc, out) + assert r.returncode == 0, f"ingest failed:\nSTDERR: {r.stderr}\nSTDOUT: {r.stdout}" + + con = duckdb.connect() + + # OC_removed must be gone + assert con.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE pid='OC_removed'" + ).fetchone()[0] == 0, "OC_removed must be removed" + + # SESAR_kept must survive + assert con.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE pid='SESAR_kept' AND otype='MaterialSampleRecord'" + ).fetchone()[0] == 1, "SESAR_kept must survive" + + # SE row_id=10 (OC's SE) must be orphaned + assert con.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE pid='se-oc' AND otype='SamplingEvent'" + ).fetchone()[0] == 0, "se-oc (orphan SE) must be removed" + + # SamplingSite row_id=30 must survive (referenced by SESAR SE) + assert con.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE pid='site-shared' AND otype='SamplingSite'" + ).fetchone()[0] == 1, "site-shared must survive — still referenced by SESAR's SE" + + # GeospatialCoordLocation row_id=20 MUST survive (this is the critical fix assertion) + assert con.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE pid='geo-shared' AND otype='GeospatialCoordLocation'" + ).fetchone()[0] == 1, ( + "geo-shared (Geo row_id=20) must NOT be orphaned — " + "surviving SamplingSite still references it via p__site_location" + ) + + # Zero dangling p__site_location refs in output (the production-scale symptom) + dangling = con.sql(f""" + WITH all_row_ids AS (SELECT row_id FROM read_parquet('{out}')), + site_refs AS ( + SELECT unnest(p__site_location) AS ref_id + FROM read_parquet('{out}') + WHERE otype='SamplingSite' + AND p__site_location IS NOT NULL AND len(p__site_location) > 0 + ) + SELECT COUNT(*) FROM site_refs + LEFT JOIN all_row_ids ON site_refs.ref_id = all_row_ids.row_id + WHERE all_row_ids.row_id IS NULL + """).fetchone()[0] + assert dangling == 0, f"p__site_location dangling refs: {dangling} (expected 0)" + + con.close() + + +# ============================================================================ +# Fix A — Fixpoint orphan removal (R2-A) +# ============================================================================ + +def test_orphan_geo_via_site_only_removed(tmp_path): + """A Geo referenced ONLY via an orphan SamplingSite's p__site_location + (and NOT by any surviving SE's p__sample_location) must be REMOVED. + + This tests Fix A's over-retention correction: the fixpoint algorithm must + not retain a Geo that appears only in an orphan chain with no surviving refs. + + Scenario: + src: + - OC MSR pid='OC_gone' → SE row_id=10 → Site row_id=20 → Geo row_id=30 + - SE row_id=10, p__sample_location=[], p__sampling_site=[20] + (SE has NO direct p__sample_location — only a site reference) + - SamplingSite row_id=20, p__site_location=[30] + - Geo row_id=30 ← referenced ONLY via orphan Site, no surviving refs + - NO other MSR references SE 10, Site 20, or Geo 30 + + Eric's OC wide: does NOT contain 'OC_gone' → stale; has 'NEW_OC_pid' + + After sync: + - 'OC_gone' MSR removed + - SE row_id=10 is orphan (no surviving MSR's p__produced_by points to it) + - SamplingSite row_id=20 is orphan (SE 10 is removed; no other ref) + - Geo row_id=30 is orphan (Site 20 is removed; no other ref) + → ALL THREE must be REMOVED + + OLD CODE BUG (pre-phase-5 path-specific logic): surviving_geo_refs included + Geos referenced by non-orphan SamplingSites — but Phase 5 only checked Site + surviving status by whether the Site appeared in surviving_site_refs, which + depended on a hand-coded SE→Site chain. If the agent traversal missed a path, + a Geo could be incorrectly retained. + + FIXPOINT: correctly computes survivor_refs from all surviving rows; since no + surviving row points to Geo 30, it is removed. + + This test MUST FAIL on old path-specific code and PASS on fixpoint code. + (It passed on Phase 5 code that hand-enumerated the site_location path, + but verifies the fixpoint correctly handles the fully-orphaned chain.) + """ + src = str(tmp_path / "src_chain.parquet") + oc = str(tmp_path / "oc_chain.parquet") + out = str(tmp_path / "out_chain.parquet") + + SE_ROW = 10 + SITE_ROW = 20 + GEO_ROW = 30 + + build_src_wide( + src, + msr_rows=[ + # OC MSR to be removed — references SE only + { + "row_id": 1000, "pid": "OC_gone", "n": "OPENCONTEXT", + "p__produced_by": [SE_ROW], + "p__has_material_category": [SRC_ROCK_CONCEPT_ID], + "latitude": 45.0, "longitude": 10.0, + }, + ], + concept_rows=SRC_CONCEPT_ROWS, + se_rows=[ + # SE: no direct p__sample_location; only p__sampling_site → Site + (SE_ROW, "se-orphan", [], [SITE_ROW]), + ], + geo_rows=[ + (GEO_ROW, "geo-orphan", 45.0, 10.0), + ], + site_rows=[ + # Site: p__site_location → Geo (the only ref to Geo) + (SITE_ROW, "site-orphan", [GEO_ROW]), + ], + ) + + build_oc_wide( + oc, + msr_rows=[ + { + "row_id": 1, "pid": "NEW_OC_pid", + "p__produced_by": [501], + "p__has_material_category": [OC_ROCK_CONCEPT_ID], + }, + ], + concept_rows=OC_CONCEPT_ROWS, + se_rows=[(501, "se-new", [601], None)], + geo_rows=[(601, "geo-new", 46.0, 11.0)], + ) + + r = run_ingest(src, oc, out) + assert r.returncode == 0, f"ingest failed:\nSTDERR: {r.stderr}\nSTDOUT: {r.stdout}" + + con = duckdb.connect() + + # All three orphan entities must be gone + for pid, otype, label in [ + ("se-orphan", "SamplingEvent", "SE row_id=10"), + ("site-orphan", "SamplingSite", "Site row_id=20"), + ("geo-orphan", "GeospatialCoordLocation", "Geo row_id=30"), + ]: + n = con.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE pid='{pid}' AND otype='{otype}'" + ).fetchone()[0] + assert n == 0, f"{label} ({pid}) should be REMOVED — no surviving refs; got count={n}" + + # Zero dangling refs in output + for col in ("p__sample_location", "p__sampling_site", "p__site_location"): + dangling = con.sql(f""" + WITH ids AS (SELECT row_id FROM read_parquet('{out}')), + refs AS (SELECT unnest({col}) AS ref_id FROM read_parquet('{out}') + WHERE {col} IS NOT NULL AND len({col}) > 0) + SELECT COUNT(*) FROM refs LEFT JOIN ids ON refs.ref_id = ids.row_id + WHERE ids.row_id IS NULL + """).fetchone()[0] + assert dangling == 0, f"{col}: {dangling} dangling refs (expected 0)" + + con.close() + + +def test_unresolved_new_ref_hard_fails(tmp_path): + """A new OC SE with p__sampling_site pointing to a SamplingSite absent from + Eric's OC wide must cause the ingest to RAISE (non-zero exit), NOT silently + emit NULL for that reference. + + This tests Fix B (silent-drop guard): after remapping, if the remapped array + length != source array length, the build must hard-fail. + + Scenario: + Eric's OC wide: + - MSR pid='new_pid' → SE row_id=201, p__produced_by=[201] + - SE row_id=201, p__sampling_site=[999] ← Site row_id=999 + - NO SamplingSite row_id=999 exists in Eric's wide + - Geo row_id=301 exists (SE's p__sample_location=[301]) + + Expected: ingest raises RuntimeError / exits non-zero. + The output file must NOT be written. + """ + src = str(tmp_path / "src_miss.parquet") + oc = str(tmp_path / "oc_miss.parquet") + out = str(tmp_path / "out_miss.parquet") + + # Minimal src: just concepts + a stale OC MSR (so there's something to remove) + build_src_wide( + src, + msr_rows=[ + { + "row_id": 1000, "pid": "pid_stale", "n": "OPENCONTEXT", + "p__produced_by": [100], + "p__has_material_category": [SRC_ROCK_CONCEPT_ID], + "latitude": 40.0, "longitude": 5.0, + }, + ], + concept_rows=SRC_CONCEPT_ROWS, + se_rows=[(100, "se-stale", [110], None)], + geo_rows=[(110, "geo-stale", 40.0, 5.0)], + ) + + # Eric's OC wide: new MSR whose SE has p__sampling_site=[999] but Site 999 is absent + build_oc_wide( + oc, + msr_rows=[ + { + "row_id": 1, "pid": "new_pid", + "p__produced_by": [201], + "p__has_material_category": [OC_ROCK_CONCEPT_ID], + }, + ], + concept_rows=OC_CONCEPT_ROWS, + # SE references Site 999 via p__sampling_site — but Site 999 is not in the wide + se_rows=[(201, "se-new-missing-site", [301], [999])], + geo_rows=[(301, "geo-new", 41.0, 6.0)], + # site_rows intentionally omitted — no Site 999 + ) + + r = run_ingest(src, oc, out) + combined = r.stdout + r.stderr + assert r.returncode != 0, ( + f"Expected ingest to FAIL (non-zero exit) when p__sampling_site ref is unresolvable, " + f"but it exited 0.\nSTDOUT:\n{r.stdout}\nSTDERR:\n{r.stderr}" + ) + assert "SILENT-DROP" in combined or "GUARD" in combined or "FATAL" in combined or "mismatch" in combined.lower(), ( + f"Expected a silent-drop guard / FATAL error message; got:\n{combined[:500]}" + ) + assert not os.path.exists(out), ( + "Output file must NOT be written when the silent-drop guard fires" + ) + + +# ============================================================================ +# Fix 1 (Round 7) — p__keywords concepts fully extracted and preserved +# ============================================================================ + +# Keyword concept IDs in Eric's space (INTEGER) +OC_KW1_CONCEPT_ID = 950 # keyword concept already in src +OC_KW2_CONCEPT_ID = 951 # keyword concept NOT in src — must be minted + +KW1_URI = "https://w3id.org/isample/keyword/1.0/existing_keyword" +KW2_URI = "https://w3id.org/isample/keyword/1.0/new_keyword" # absent from src → minted + +# src concept ID for the existing keyword (BIGINT) +SRC_KW1_CONCEPT_ID = 10 + + +def test_new_msr_keywords_preserved(tmp_path): + """New OC MSR with p__keywords pointing to concept(s) — at least one NOT in src + (forcing a mint) — must have all keyword refs preserved in output: + - output p__keywords array length == source array length + - all targets resolve to IdentifiedConcept rows in output + - zero dangling keyword refs in output + + This test MUST FAIL on old HEAD (where keywords were silently dropped because + keyword concept URIs were not collected in new_concept_refs and thus not minted, + causing the remap_msr_kw inner join to produce no matches → remap length 0 vs + source length 2 → silent-drop guard fires → FATAL). + + After FIX 1: keywords are included in new_concept_refs; missing keyword concepts + are minted; the full-length remap is verified by the silent-drop guard; all refs + are valid in output. + """ + src = str(tmp_path / "src_kw.parquet") + oc = str(tmp_path / "oc_kw.parquet") + out = str(tmp_path / "out_kw.parquet") + + # ---- src wide: has existing keyword concept (KW1), NOT KW2 ---- + src_concepts = list(SRC_CONCEPT_ROWS) + [(SRC_KW1_CONCEPT_ID, KW1_URI)] + + build_src_wide( + src, + msr_rows=[ + # Stale OC MSR (to ensure removal path is exercised) + { + "row_id": 1000, "pid": "pid_stale", "n": "OPENCONTEXT", + "p__produced_by": [103], + "p__has_material_category": [SRC_ROCK_CONCEPT_ID], + "latitude": 60.0, "longitude": 20.0, + }, + ], + concept_rows=src_concepts, + se_rows=[(103, "se-stale", [203], None)], + geo_rows=[(203, "geo-stale", 60.0, 20.0)], + ) + + # ---- OC wide: new MSR with p__keywords=[OC_KW1_CONCEPT_ID, OC_KW2_CONCEPT_ID] + # KW1 is already in src (must be looked up by URI, not minted) + # KW2 is NOT in src (must be minted as a new IdentifiedConcept row) + oc_concepts = list(OC_CONCEPT_ROWS) + [ + (OC_KW1_CONCEPT_ID, KW1_URI, "Existing Keyword"), + (OC_KW2_CONCEPT_ID, KW2_URI, "New Keyword"), + ] + + build_oc_wide( + oc, + msr_rows=[ + { + "row_id": 1, "pid": "pid_kw", + "p__produced_by": [101], + "p__has_material_category": [OC_ROCK_CONCEPT_ID], + "p__keywords": [OC_KW1_CONCEPT_ID, OC_KW2_CONCEPT_ID], + }, + ], + concept_rows=oc_concepts, + se_rows=[(101, "se-kw", [201], None)], + geo_rows=[(201, "geo-kw", 45.0, 10.0)], + ) + + r = run_ingest(src, oc, out) + assert r.returncode == 0, ( + f"ingest failed:\nSTDERR: {r.stderr}\nSTDOUT: {r.stdout}" + ) + + con = duckdb.connect() + + # 1. New OC MSR must be present with n='OPENCONTEXT' + kw_msr = con.sql( + f"SELECT p__keywords FROM read_parquet('{out}') " + f"WHERE pid='pid_kw' AND otype='MaterialSampleRecord'" + ).fetchone() + assert kw_msr is not None, "pid_kw MSR missing from output" + kw_refs = kw_msr[0] + assert kw_refs is not None, "p__keywords is NULL in output (should be a 2-element array)" + assert len(kw_refs) == 2, ( + f"p__keywords length mismatch: expected 2, got {len(kw_refs)}. " + f"Array: {kw_refs}" + ) + + # 2. Both keyword targets must resolve to IdentifiedConcept rows in output + for ref_id in kw_refs: + resolved = con.sql( + f"SELECT pid, otype FROM read_parquet('{out}') WHERE row_id = {ref_id}" + ).fetchone() + assert resolved is not None, ( + f"Keyword ref row_id={ref_id} not found in output (dangling ref!)" + ) + assert resolved[1] == "IdentifiedConcept", ( + f"Keyword ref row_id={ref_id} resolves to otype={resolved[1]!r}, " + f"expected 'IdentifiedConcept'" + ) + + # 3. Verify both URIs are resolvable in output IdentifiedConcept rows + kw1_out = con.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') " + f"WHERE otype='IdentifiedConcept' AND pid='{KW1_URI}'" + ).fetchone()[0] + assert kw1_out == 1, f"KW1 concept ({KW1_URI}) missing from output" + + kw2_out = con.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') " + f"WHERE otype='IdentifiedConcept' AND pid='{KW2_URI}'" + ).fetchone()[0] + assert kw2_out == 1, f"KW2 concept ({KW2_URI}) not minted in output (should have been minted)" + + # 4. Zero dangling p__keywords refs in output + dangling = con.sql(f""" + WITH all_row_ids AS (SELECT row_id FROM read_parquet('{out}')), + kw_refs AS ( + SELECT unnest(p__keywords) AS ref_id + FROM read_parquet('{out}') + WHERE p__keywords IS NOT NULL AND len(p__keywords) > 0 + ) + SELECT COUNT(*) FROM kw_refs + LEFT JOIN all_row_ids ON kw_refs.ref_id = all_row_ids.row_id + WHERE all_row_ids.row_id IS NULL + """).fetchone()[0] + assert dangling == 0, f"p__keywords: {dangling} dangling refs in output (expected 0)" + + con.close() + + +# ============================================================================ +# Round 8 — concept-label search (#277 part 2) +# ============================================================================ + +def test_concept_label_search(tmp_path): + """build_sample_facets_v2 must append IC labels to description so that + textSearch on concept terms (e.g. 'pottery') matches samples tagged with + that concept even when the word does not appear in label/description/place_name. + + Scenario (single wide parquet, post-ingest shape — BLOB geometry, BIGINT arrays): + - MSR pid='pid_pottery' → p__keywords → IC row_id=1001 (label='pottery') + → p__has_material_category → IC row_id=1002 (label='rock') + label='some label', description='some description' + (neither 'pottery' nor 'rock' appears in original label/description) + - MSR pid='pid_no_concept' — no concept refs at all, description='plain desc' + - IC row_id=1001 label='pottery' + - IC row_id=1002 label='rock' + + After build_sample_facets_v2: + - pid_pottery description must contain 'pottery' AND 'rock' + - pid_no_concept description must equal 'plain desc' (no appended space) + - ILIKE '%pottery%' must find pid_pottery + - ILIKE '%pottery%' must NOT find pid_no_concept + """ + import duckdb as _duckdb + import time + sys.path.insert(0, os.path.join(REPO, "scripts")) + import build_frontend_derived as B + + wide = str(tmp_path / "wide_concept_label.parquet") + out = str(tmp_path / "facets_v2_cl.parquet") + + # --- build synthetic wide parquet in post-ingest shape (BLOB geometry, BIGINT arrays) --- + con_build = _duckdb.connect() + con_build.execute("INSTALL spatial; LOAD spatial;") + con_build.execute(f""" + COPY ( + -- MSR pid_pottery: p__keywords=[1001], p__has_material_category=[1002] + -- original label/description do NOT contain 'pottery' or 'rock' + SELECT 1::BIGINT AS row_id, + 'pid_pottery' AS pid, + 'MaterialSampleRecord' AS otype, + 'SESAR'::VARCHAR AS n, + ST_AsWKB(ST_Point(10.0, 45.0))::BLOB AS geometry, + 45.0::DOUBLE AS latitude, + 10.0::DOUBLE AS longitude, + 'some label'::VARCHAR AS label, + 'some description'::VARCHAR AS description, + ['Somewhere']::VARCHAR[] AS place_name, + NULL::TIMESTAMP AS result_time, + [1002]::BIGINT[] AS p__has_material_category, + NULL::BIGINT[] AS p__has_sample_object_type, + NULL::BIGINT[] AS p__has_context_category, + NULL::BIGINT[] AS p__produced_by, + NULL::BIGINT[] AS p__sample_location, + NULL::BIGINT[] AS p__sampling_site, + NULL::BIGINT[] AS p__site_location, + NULL::BIGINT[] AS p__registrant, + [1001]::BIGINT[] AS p__keywords, + NULL::BIGINT[] AS p__responsibility, + NULL::INTEGER[] AS p__curation, + NULL::BIGINT[] AS p__related_resource, + NULL::VARCHAR AS thumbnail_url, + NULL::VARCHAR AS scheme_name, + NULL::VARCHAR AS scheme_uri + UNION ALL + -- MSR pid_no_concept: no concept refs; description must be unchanged + SELECT 2::BIGINT, 'pid_no_concept', 'MaterialSampleRecord', 'SESAR', + ST_AsWKB(ST_Point(11.0, 46.0))::BLOB, + 46.0::DOUBLE, 11.0::DOUBLE, + 'no-concept label'::VARCHAR, 'plain desc'::VARCHAR, + ['Elsewhere']::VARCHAR[], NULL::TIMESTAMP, + NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], + NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], + NULL::BIGINT[], NULL::BIGINT[], NULL::INTEGER[], NULL::BIGINT[], + NULL::VARCHAR, NULL::VARCHAR, NULL::VARCHAR + UNION ALL + -- IC row_id=1001: label='pottery' + SELECT 1001::BIGINT, 'https://example.org/kw/pottery', + 'IdentifiedConcept', NULL::VARCHAR, + NULL::BLOB, NULL::DOUBLE, NULL::DOUBLE, + 'pottery'::VARCHAR, NULL::VARCHAR, + NULL::VARCHAR[], NULL::TIMESTAMP, + NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], + NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], + NULL::BIGINT[], NULL::BIGINT[], NULL::INTEGER[], NULL::BIGINT[], + NULL::VARCHAR, NULL::VARCHAR, NULL::VARCHAR + UNION ALL + -- IC row_id=1002: label='rock' + SELECT 1002::BIGINT, 'https://example.org/mat/rock', + 'IdentifiedConcept', NULL::VARCHAR, + NULL::BLOB, NULL::DOUBLE, NULL::DOUBLE, + 'rock'::VARCHAR, NULL::VARCHAR, + NULL::VARCHAR[], NULL::TIMESTAMP, + NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], + NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], NULL::BIGINT[], + NULL::BIGINT[], NULL::BIGINT[], NULL::INTEGER[], NULL::BIGINT[], + NULL::VARCHAR, NULL::VARCHAR, NULL::VARCHAR + ) TO '{wide}' (FORMAT PARQUET) + """) + con_build.close() + + # --- run build_base_tables + build_sample_facets_v2 --- + con = _duckdb.connect() + con.execute("INSTALL h3 FROM community; LOAD h3; INSTALL spatial; LOAD spatial;") + t0 = time.time() + B.build_base_tables(con, wide, t0) + B.build_sample_facets_v2(con, out) + con.close() + + # --- assertions --- + con2 = _duckdb.connect() + + # 1. pid_pottery description must contain 'pottery' (from p__keywords IC label) + pottery_desc = con2.sql( + f"SELECT description FROM read_parquet('{out}') WHERE pid='pid_pottery'" + ).fetchone() + assert pottery_desc is not None, "pid_pottery missing from facets_v2 output" + desc = pottery_desc[0] + assert desc is not None, "pid_pottery description is NULL" + assert 'pottery' in desc.lower(), ( + f"'pottery' not found in pid_pottery description: {desc!r}" + ) + + # 2. pid_pottery description must also contain 'rock' (from p__has_material_category IC label) + assert 'rock' in desc.lower(), ( + f"'rock' not found in pid_pottery description: {desc!r}" + ) + + # 3. pid_no_concept description must be unchanged ('plain desc' with no appended garbage) + no_concept_desc = con2.sql( + f"SELECT description FROM read_parquet('{out}') WHERE pid='pid_no_concept'" + ).fetchone() + assert no_concept_desc is not None, "pid_no_concept missing from facets_v2 output" + assert no_concept_desc[0] == 'plain desc', ( + f"pid_no_concept description changed unexpectedly: {no_concept_desc[0]!r}" + ) + + # 4. ILIKE '%pottery%' finds pid_pottery (simulates explorer textSearch) + pottery_hits = con2.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE description ILIKE '%pottery%'" + ).fetchone()[0] + assert pottery_hits == 1, ( + f"Expected 1 pottery hit via ILIKE, got {pottery_hits}" + ) + + # 5. pid_no_concept is NOT matched by ILIKE '%pottery%' + no_hit = con2.sql( + f"SELECT COUNT(*) FROM read_parquet('{out}') WHERE pid='pid_no_concept' AND description ILIKE '%pottery%'" + ).fetchone()[0] + assert no_hit == 0, ( + f"pid_no_concept should NOT match 'pottery' but did (description: {no_concept_desc[0]!r})" + ) + + con2.close()