Synchronize data between a vertex table and a trajectory table

更新时间:
复制 MD 格式

Raw trajectory data typically arrives as discrete points. To run spatial analysis, those points must be aggregated into complete trajectories stored in a trajectory table. GanosBase provides two approaches to keep a vertex table and a trajectory table in sync.

Choose a synchronization method

Manual synchronizationAutomatic synchronization
How it worksA user-defined function (UDF) batches unsynced points and appends them to the trajectory table on demandA trigger fires on every INSERT and appends the new point to the trajectory table
Real-time syncNoYes
Write performanceDoes not affect write performance of the vertex tableEach INSERT incurs additional aggregation cost, which increases write latency
When to useBatch or periodic ingestion where real-time accuracy is not requiredStreaming ingestion where the trajectory table must stay current

Manual synchronization

Manual synchronization uses a UDF to batch-process all unsynced points. Run it on a schedule or on demand.

Step 1: Prepare the vertex table

Add a sync column to track which points have been processed, and create an index on it to make the UDF query efficient.

-- Add a boolean column to flag unsynced points.
ALTER TABLE sample_points ADD COLUMN sync bool DEFAULT false;
CREATE INDEX ON sample_points USING btree(sync);

Step 2: Create the sync UDF

The following function reads all rows where sync = false, builds a sorted trajectory per user with ST_Sort and ST_MakeTrajectory, and upserts the result into trajectory_table. After the upsert, it marks those rows as synced.

CREATE OR REPLACE FUNCTION trajectory_cast_append()
RETURNS void
AS $$
BEGIN
INSERT INTO trajectory_table
SELECT userid, ST_Sort(ST_MakeTrajectory(pnts.tjraw, true, '{"intensity"}'::cstring[]))
FROM
(SELECT sample_points.userid, array_agg(ROW(sample_points.sample_time,     sample_points.x, sample_points.y, sample_points.z, sample_points.intensity)) as tjraw FROM sample_points WHERE sync = false GROUP BY userid) pnts
ON CONFLICT(userid) DO UPDATE
  SET traj = ST_Append(trajectory_table.traj, excluded.traj);
UPDATE sample_points
SET sync = true WHERE sync = false;
END;
$$ LANGUAGE plpgsql STRICT PARALLEL SAFE;

Step 3: Run the sync task

SELECT trajectory_cast_append();

Call this statement whenever you want to push pending points to the trajectory table.

Complete Step 1 before creating the UDF. Without the sync column, the function cannot distinguish processed from unprocessed rows.

Automatic synchronization

Automatic synchronization uses a row-level trigger that fires after every INSERT on the vertex table. The trigger function calls ST_MakeTrajectory and upserts the new point into trajectory_table. Because the trigger runs on every INSERT, the aggregation cost adds directly to write latency on sample_points.

Step 1: Create the trigger function

The following function builds a single-point trajectory from the newly inserted row and appends it to the matching user's trajectory using ST_Append.

CREATE OR REPLACE FUNCTION trajectory_sync_point() RETURNS TRIGGER AS $$
BEGIN
INSERT INTO trajectory_table
SELECT NEW.userid, ST_MakeTrajectory(array_agg(ROW(NEW.sample_time, NEW.x, NEW.y, NEW.z, NEW.intensity)), true, '{"intensity"}'::cstring[])
ON CONFLICT(userid) DO UPDATE
  SET traj = ST_Append(trajectory_table.traj, excluded.traj);
RETURN NULL;
END;
$$
LANGUAGE plpgsql STRICT PARALLEL SAFE;

Step 2: Attach the trigger

CREATE TRIGGER point_trigger AFTER INSERT ON sample_points
    FOR EACH ROW EXECUTE PROCEDURE trajectory_sync_point();

After this trigger is in place, every INSERT into sample_points automatically updates trajectory_table.