Last Updated: January 5, 2026
Geocoding a few thousand addresses is straightforward, but very large datasets, like millions of records stored in a database, require a workflow that is optimized for speed and repeatability. This guide explains how to batch geocode a large address table by exporting records from PostgreSQL in batches, geocoding them in Maptitude mapping software, then writing latitude/longitude results back into the database using an automated Python script.
While Maptitude can connect directly to PostgreSQL, it cannot retrieve records in batches. Exporting data into batch CSV files provides a faster and more controllable geocoding pipeline for large-scale jobs.
Table of Contents
1. Prepare Your Data and Install Requirements
2. Export a Batch of Addresses from PostgreSQL
3. Geocode a Batch in Maptitude
4. Export Geocoded Results and Update PostgreSQL
5. Tips and Troubleshooting
Conclusion
When to Use This Workflow
- You have hundreds of thousands to millions of address records.
- Your data is stored in a PostgreSQL (or similar) database.
- You need an automated workflow that can run continuously without manual intervention.
- You want latitude/longitude written back into your database.
Step 1: Prepare Your Data and Install Requirements
- Download and install a free trial of Maptitude mapping software, or use your licensed copy.
- Install Python 3.x.
- Install required packages:
caliperpy,psycopg2, andpandas.
Your database table should include:
- A unique record ID (example:
primary_key) - Address field(s) (example:
address_line_1) - Postal/ZIP field (example:
owner_zip) - Latitude and longitude fields (
latitude,longitude) - A status field to track progress (example:
geolocated)
Step 2: Export a Batch of Addresses from PostgreSQL
The script connects to PostgreSQL and exports a batch of records where geolocated IS NULL into a CSV file. Batching prevents large-scale jobs from timing out and makes the workflow easier to restart if needed.
- Set your PostgreSQL connection parameters (host, database, schema, table, user, password).
- Set a batch size (example:
10000records per run). - Run the export step to generate a CSV batch file.
Step 3: Geocode a Batch in Maptitude
The script uses caliperpy to connect to Maptitude and geocode the batch CSV. The geocoding method used in this script is Address + ZIP (street address field + postal/ZIP field).
- The CSV is opened as a table in Maptitude.
- A
Data.Geocoderobject runsLocateView()using:- Address field:
address_line_1 - ZIP field:
owner_zip
- Address field:
- Maptitude creates an output layer and exports the geocoded results to CSV.
Step 4: Export Geocoded Results and Update PostgreSQL
After geocoding, the script exports the results and writes them back to PostgreSQL. To improve performance, it uses temporary tables and PostgreSQL’s COPY command to load batch updates quickly.
- Export the geocoded layer to a CSV (including Longitude and Latitude).
- Load the geocoded CSV into a temporary table using
COPY. - Update
latitudeandlongitudein your main table using JOIN-based updates. - Mark all batch records as attempted (example: set
geolocated = 'attempted').
Step 5: Tips and Troubleshooting
- Layer removal warnings: The script attempts to drop a layer that may not exist yet. This is usually harmless and can be improved by checking for the layer first. :contentReference[oaicite:6]{index=6}
- Zero matches in a batch: If no addresses geocode successfully, Maptitude may not export a geocoded CSV. Add error handling to mark the batch as attempted even when zero results are returned to avoid reprocessing the same records. :contentReference[oaicite:7]{index=7}
- Alternate geocoding methods: This script uses Address + ZIP only. You can expand the input fields to include city/state or add toggles between geocoding strategies.
import os
import sys
import pandas as pd
import psycopg2
import caliperpy
from datetime import datetime
import time
# Global logging level - set to "normal" or "verbose"
LOGGING_LEVEL = "normal" # Change to "verbose" for detailed logging
def log(message, level="normal"):
"""
Logs a message based on the current logging level
"""
if level == "normal" or LOGGING_LEVEL == "verbose":
print(message)
def export_data_to_csv(connection_params, output_folder, view_name, batch_size=10000):
"""
Export data from PostgreSQL to CSV and return the list of all property IDs being processed
Limited to batch_size records at a time where geolocated is NULL
"""
log(f"Exporting data for {view_name} to CSV (batch size: {batch_size})...", "verbose")
conn = psycopg2.connect(
host=connection_params['host'],
database=connection_params['database'],
user=connection_params['user'],
password=connection_params['password']
)
# Create timestamp for unique filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_filename = os.path.join(output_folder, f"{view_name}_{timestamp}.csv")
# Query to export data that needs geocoding - limit to batch_size records where geolocated is NULL
query = f"""
SELECT primary_key, address_line_1, owner_zip
FROM {connection_params['schema']}.{connection_params['table']}
WHERE geolocated IS NULL
LIMIT {batch_size}
"""
# Export to CSV
df = pd.read_sql(query, conn)
# Get count of remaining records
cursor = conn.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {connection_params['schema']}.{connection_params['table']} WHERE geolocated IS NULL")
total_remaining = cursor.fetchone()[0]
cursor.close()
# Only proceed if we have records to process
if len(df) > 0:
df.to_csv(csv_filename, index=False)
# Get all property IDs being processed
all_processed_ids = df['primary_key'].tolist()
conn.close()
log(f"Exported {len(df)} records to {csv_filename}", "verbose")
log(f"Total remaining records to process: {total_remaining}", "verbose")
return csv_filename, all_processed_ids, len(df) > 0
else:
conn.close()
log("No records found with geolocated = NULL. Processing complete.")
return None, [], False
def geocode_with_maptitude(csv_filename, view_name):
"""
Geocode using Maptitude with an alternative approach
"""
log(f"Starting geocoding process for {csv_filename}...", "verbose")
# Connect to Maptitude using caliperpy
dk = caliperpy.Maptitude.connect()
log("Successfully connected to Maptitude", "verbose")
# Create a simple name for the table (without path or extension)
base_name = os.path.basename(csv_filename)
table_name = os.path.splitext(base_name)[0]
# Generate a unique timestamp for this batch
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Output database path with timestamp to ensure uniqueness
output_folder = os.path.dirname(csv_filename)
output_db = os.path.join(output_folder, f"{view_name}_Layer_{timestamp}.dbd")
# Try to drop any existing layer with the same name
try:
# Per documentation: DropLayer(map_name, layer_name)
# If map_name is null, it affects the current map
dk.DropLayer(None, f"{view_name}_Layer")
log(f"Dropped existing layer: {view_name}_Layer", "verbose")
except Exception as e:
log(f"No existing layer to drop or error dropping layer: {e}", "verbose")
# Try a different approach: Create a table and import data from CSV
log(f"Creating a new table and importing data from CSV...", "verbose")
try:
# First try to use the OpenTable function if the table already exists
table_view = dk.OpenTable(table_name, "CSV", [csv_filename, None])
log(f"Table opened successfully: {table_view}", "verbose")
except Exception as e:
log(f"Could not open table: {e}", "verbose")
log("Trying alternative approach...", "verbose")
# Try using a Python script approach with embedded GISDK
script = dk.CreateObject("Python",
"""
import caliperpy
import os
def import_csv(csv_path, view_name):
dk = caliperpy.Maptitude.connect()
table_name = dk.OpenTable(view_name, "CSV", [csv_path, None])
return table_name
# The import_csv function will be called by Maptitude
"""
)
# Call the Python function from within Maptitude
table_view = script.import_csv({"csv_path": csv_filename, "view_name": table_name})
log(f"Table created using Python script: {table_view}", "verbose")
# Set the view to the imported table
dk.SetView(table_view)
# Step 2: Configure geocoding options
opts = {}
opts["new_layer_name"] = f"{view_name}_Layer" # Use consistent layer name
opts["out_db"] = output_db
# Step 3: Get field specifications
log("Getting field specifications...", "verbose")
id_field = dk.GetFieldFullSpec(table_view, "primary_key")
address_field = dk.GetFieldFullSpec(table_view, "address_line_1")
postal_field = dk.GetFieldFullSpec(table_view, "owner_zip")
# Step 4: Create geocoder object and set region
log("Creating geocoder object and setting region...", "verbose")
try:
# First try with standard CreateObject syntax
geo = dk.CreateObject("Data.Geocoder", "gis_ui")
log("Successfully created geocoder object using CreateObject", "verbose")
except Exception as e:
log(f"Error creating geocoder object with CreateObject: {e}", "verbose")
# Alternative approach using RunMacro
log("Trying alternative approach with RunMacro...", "verbose")
geo = dk.RunMacro("CreateObject", "Data.Geocoder", "gis_ui")
log("Successfully created geocoder object using RunMacro", "verbose")
log("Setting region...", "verbose")
geo.SetRegion() # Assumes US region
# Step 5: Run the geocoding process for street addresses
log("Running geocoding process...", "verbose")
# The input_field_specs array needs to be in the correct order for the ADDRESS method
# The documentation specifies { address_field, address_field_2, postal_field } or
# { address_field, address_field_2, city_field, state_field, postal_field }
input_field_specs = [address_field, None, postal_field]
log(f"Input field specs: {input_field_specs}", "verbose")
try:
result = geo.LocateView(
"ADDRESS",
f"{table_view}|",
id_field,
input_field_specs,
opts
)
except Exception as e:
log(f"Error calling LocateView: {e}", "verbose")
log("Trying alternative approach...", "verbose")
# Try direct RunMacro approach
result = dk.RunMacro("LocateView", geo, "ADDRESS", f"{table_view}|", id_field, input_field_specs, opts)
log(f"Result from RunMacro: {result}", "verbose")
# Check if geocoding was successful and extract NumLocated value
num_located = None
if isinstance(result, tuple) and len(result) > 0:
# Parse the tuple result to find NumLocated
for item in result:
if isinstance(item, tuple) and len(item) == 2 and item[0] == 'NumLocated':
num_located = item[1]
break
# Check if geocoding was successful
if isinstance(result, dict) and "LayerName" not in result:
log(f"Geocoding failed. Result: {result}", "verbose")
raise Exception("Geocoding failed to create a layer")
elif not isinstance(result, dict) and not (isinstance(result, str) and "Layer" in result):
log(f"Geocoding returned unexpected result type: {type(result)}, value: {result}", "verbose")
# Try to determine layer name from naming convention
layer_name = opts["new_layer_name"]
log(f"Using assumed layer name: {layer_name}", "verbose")
else:
if isinstance(result, dict) and "LayerName" in result:
layer_name = result["LayerName"]
elif isinstance(result, dict):
# If result is a dict but LayerName isn't in it, try to find anything with "Layer" in it
layer_keys = [k for k in result.keys() if "layer" in k.lower()]
if layer_keys:
layer_name = result[layer_keys[0]]
else:
layer_name = opts["new_layer_name"]
else:
# Assume result is the layer name itself
layer_name = result
log(f"Geocoding completed. Layer name: {layer_name}", "verbose")
# Step 6: Export results to CSV
output_csv = csv_filename.replace(".csv", "_geocoded.csv")
log(f"Exporting results to {output_csv}", "verbose")
# Export the results to CSV
dk.ExportCSV(f"{layer_name}|", output_csv, None)
# Immediately create a new CSV with proper headers
log("Creating a new CSV with proper headers...", "verbose")
try:
# Read the raw CSV data
with open(output_csv, 'r') as f:
csv_content = f.readlines()
# Determine the number of columns from the first line
if csv_content:
first_line = csv_content[0]
num_columns = len(first_line.strip().split(','))
log(f"Detected {num_columns} columns in the CSV", "verbose")
# Create new CSV with headers
temp_csv = output_csv.replace(".csv", "_with_headers.csv")
with open(temp_csv, 'w') as f:
# Write header row
if num_columns == 3:
f.write("primary_key,Longitude,Latitude\n")
else:
header_row = ['primary_key']
for i in range(1, num_columns):
if i == 1:
header_row.append('Longitude')
elif i == 2:
header_row.append('Latitude')
else:
header_row.append(f'Field{i+1}')
f.write(','.join(header_row) + '\n')
# Write all the data rows
f.writelines(csv_content)
# Replace the original CSV with the new one
import shutil
shutil.move(temp_csv, output_csv)
log(f"Successfully created CSV with headers: {output_csv}", "verbose")
except Exception as e:
log(f"Error adding headers to CSV: {e}", "verbose")
# Step 7: Close map and clean up
log("Cleaning up...", "verbose")
# Try to drop the layer after use
try:
dk.DropLayer(None, layer_name)
log(f"Dropped layer after use: {layer_name}", "verbose")
except Exception as e:
log(f"Error dropping layer after use: {e}", "verbose")
dk.CloseMap(None)
# Disconnect from Maptitude
caliperpy.Maptitude.disconnect()
log("Disconnected from Maptitude", "verbose")
# Return the path to the geocoded CSV and number of records geocoded
return output_csv, num_located
def import_geocoded_data(connection_params, geocoded_csv, all_processed_ids=None):
"""
Import geocoded data back to PostgreSQL using two temporary tables:
1. One for geocoded data (lat/long values) from CSV
2. One for tracking all attempted records
Then use fast JOINs to update the main table
"""
log(f"Importing geocoded data back to database from {geocoded_csv}...", "verbose")
try:
# Check if the CSV exists
if not os.path.exists(geocoded_csv):
log(f"Error: Geocoded CSV file not found at {geocoded_csv}")
return 0
# Connect to PostgreSQL
conn = psycopg2.connect(
host=connection_params['host'],
database=connection_params['database'],
user=connection_params['user'],
password=connection_params['password']
)
# Count records in the CSV
import csv
with open(geocoded_csv, 'r') as f:
csv_reader = csv.reader(f)
next(csv_reader) # Skip header
geocoded_csv_count = sum(1 for row in csv_reader)
log(f"DIAGNOSTIC: Found {geocoded_csv_count} records in the geocoded CSV file", "verbose")
with conn.cursor() as cursor:
# Generate unique temp table names
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
geocoded_temp_table = f"temp_geocoded_data_{timestamp}"
attempted_temp_table = f"temp_attempted_data_{timestamp}"
# FIRST TEMP TABLE: Create and populate the geocoded temp table directly from CSV
log(f"Creating and populating temporary table for geocoded data: {geocoded_temp_table}", "verbose")
cursor.execute(f"""
CREATE TEMPORARY TABLE {geocoded_temp_table} (
primary_key INTEGER PRIMARY KEY,
longitude FLOAT,
latitude FLOAT
)
""")
# Use COPY to quickly load CSV data into the temp table
with open(geocoded_csv, 'r') as f:
# Skip header row
next(f)
# Use COPY command for fast import
cursor.copy_from(f, geocoded_temp_table, sep=',', columns=('primary_key', 'longitude', 'latitude'))
# Count records in the temp table
cursor.execute(f"SELECT COUNT(*) FROM {geocoded_temp_table}")
geocoded_temp_count = cursor.fetchone()[0]
log(f"DIAGNOSTIC: Loaded {geocoded_temp_count} records into geocoded temp table", "verbose")
# SECOND TEMP TABLE: Create and populate temp table for all attempted records
if all_processed_ids is not None and len(all_processed_ids) > 0:
log(f"Creating temporary table for tracking {len(all_processed_ids)} attempted records", "verbose")
cursor.execute(f"""
CREATE TEMPORARY TABLE {attempted_temp_table} (
primary_key INTEGER PRIMARY KEY,
geolocated VARCHAR(30) DEFAULT 'attempted'
)
""")
# Prepare values for fast insertion using VALUES clause
values_str = ','.join(f"({int(id)})" for id in all_processed_ids)
cursor.execute(f"""
INSERT INTO {attempted_temp_table} (primary_key)
VALUES {values_str}
""")
# Count records in the attempted temp table
cursor.execute(f"SELECT COUNT(*) FROM {attempted_temp_table}")
attempted_temp_count = cursor.fetchone()[0]
log(f"DIAGNOSTIC: Added {attempted_temp_count} records to attempted temp table", "verbose")
# UPDATE #1: Fast JOIN to update lat/long values - ONLY for current batch IDs
log("Updating main table with geocoded data using JOIN - restricted to current batch", "verbose")
cursor.execute(f"""
UPDATE {connection_params['schema']}.{connection_params['table']} AS t
SET
latitude = g.latitude,
longitude = g.longitude
FROM {geocoded_temp_table} AS g
WHERE t.primary_key = g.primary_key
AND t.primary_key IN (SELECT primary_key FROM {attempted_temp_table})
""")
geocoded_updated_count = cursor.rowcount
# UPDATE #2: Fast JOIN to update attempted status
log("Updating main table with attempted status using JOIN", "verbose")
cursor.execute(f"""
UPDATE {connection_params['schema']}.{connection_params['table']} AS t
SET
geolocated = 'attempted'
FROM {attempted_temp_table} AS a
WHERE t.primary_key = a.primary_key
""")
attempted_updated_count = cursor.rowcount
# Drop the temporary tables
log("Dropping temporary tables", "verbose")
cursor.execute(f"DROP TABLE IF EXISTS {geocoded_temp_table}")
cursor.execute(f"DROP TABLE IF EXISTS {attempted_temp_table}")
# Commit the transaction
conn.commit()
conn.close()
log(f"Updated {geocoded_updated_count} records with geocoded data")
log(f"Updated {attempted_updated_count} records with 'attempted' status")
return attempted_updated_count, geocoded_updated_count
except Exception as e:
log(f"Error importing geocoded data: {e}")
import traceback
traceback.print_exc()
return 0, 0
def main():
# PostgreSQL connection parameters - enter your details here
connection_params = {
'host': 'host',
'database': 'postgres',
'schema': 'public',
'table': 'table',
'user': 'postgres',
'password': 'password'
}
# Output folder for CSV files
output_folder = r"C:\User\OneDrive\Documents\Geocoding"
os.makedirs(output_folder, exist_ok=True)
# The name used for your Maptitude views
view_name = "geocoding_view"
# Batch size for processing
batch_size = 10000
# Loop until no more records need processing
batch_num = 1
has_more_records = True
while has_more_records:
try:
log(f"\n==== Processing Batch #{batch_num} (Max {batch_size} records) ====\n")
# Step 1: Export data to CSV - limit to batch_size records
csv_filename, all_processed_ids, has_more_records = export_data_to_csv(
connection_params, output_folder, view_name, batch_size)
if not has_more_records:
log("No more records to process. Exiting.")
break
# Step 2: Geocode data with Maptitude
geocoded_csv, num_located = geocode_with_maptitude(csv_filename, view_name)
# Step 3: Import geocoded data back to PostgreSQL
attempted_updated_count, geocoded_updated_count = import_geocoded_data(
connection_params, geocoded_csv, all_processed_ids)
log(f"Batch #{batch_num} completed. Updated {geocoded_updated_count} records with geocoded data, {attempted_updated_count} marked as 'attempted'.")
# Increment batch counter
batch_num += 1
# Add a small delay between batches to allow system resources to recover
log("Waiting 5 seconds before processing next batch...")
time.sleep(5)
except Exception as e:
log(f"Error processing batch #{batch_num}: {e}")
import traceback
traceback.print_exc()
# If there's an error, wait a bit longer before trying the next batch
log("Error encountered. Waiting 30 seconds before attempting next batch...")
time.sleep(30)
continue
log("\nGeocoding process completed. All eligible records have been processed.")
if __name__ == "__main__":
# To toggle between normal and verbose modes, change the LOGGING_LEVEL at the top of the file
# or uncomment the line below:
# LOGGING_LEVEL = "verbose" # For detailed logging
main()
Conclusion
Batch geocoding in Maptitude makes it possible to process millions of records efficiently by exporting database records in manageable chunks, geocoding them automatically, and writing results back into PostgreSQL. This approach is especially useful when working with large SQL tables that would otherwise be too slow to geocode through a direct database connection.
