182 lines
6.3 KiB
Python
Executable file
182 lines
6.3 KiB
Python
Executable file
#!/usr/bin/env python
|
|
import sys
|
|
import os
|
|
import re
|
|
# import time
|
|
import logging
|
|
import argparse as arg
|
|
import psycopg2 as pg
|
|
from psycopg2.extensions import AsIs
|
|
|
|
from classes.CustomFormater import CustomFormatter
|
|
from classes.Timer import Timer
|
|
|
|
# Schema name is NAMEDATALEN-1 (PGSQL source code)
|
|
# -> src/include/pg_config_manual.h
|
|
def check_schema_name(arg_value, pat=re.compile(r"^[a-z0-9A-Z]{1,63}$")):
|
|
if not pat.match(arg_value):
|
|
raise ValueError
|
|
return arg_value
|
|
|
|
|
|
def parse_args():
|
|
"""
|
|
Parse arguments
|
|
"""
|
|
parser = arg.ArgumentParser('Process csv files from INSEE')
|
|
|
|
parser.add_argument('--source', '-s',
|
|
help='csv source directory',
|
|
default='exports')
|
|
|
|
parser.add_argument('--connection-file', '-f',
|
|
help='Postgresql connexion file',
|
|
default='.pgconn'
|
|
)
|
|
|
|
# As whe use AsIs function to not include '' in our sql queries for schema
|
|
# name, me mus ensure that it is not contains SQL special characters
|
|
# like comments --, final coma etc and avoid SQL injection
|
|
parser.add_argument('--schema-name',
|
|
help='Database schema name',
|
|
type=check_schema_name,
|
|
default='insee'
|
|
)
|
|
|
|
debug_group = parser.add_mutually_exclusive_group()
|
|
debug_group.add_argument('--verbose', '-V',
|
|
help='Verbose output',
|
|
action='store_true')
|
|
debug_group.add_argument('--debug', '-d',
|
|
help='Activate debug mode',
|
|
action='store_true')
|
|
return parser.parse_args()
|
|
|
|
if __name__ == '__main__':
|
|
args = parse_args()
|
|
t = Timer()
|
|
#logging.basicConfig(level=logging.DEBUG)
|
|
logger = logging.getLogger()
|
|
tty_handler = logging.StreamHandler()
|
|
|
|
# create console handler with a higher log level
|
|
tty_handler.setFormatter(CustomFormatter())
|
|
logger.addHandler(tty_handler)
|
|
|
|
if args.verbose is True:
|
|
logger.setLevel(logging.INFO)
|
|
logger.info('VERBOSE mode activated')
|
|
|
|
if args.debug is True:
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.debug('DEBUG mode activated')
|
|
|
|
logging.debug('Import pgsql connection file {}'.format(args.connection_file))
|
|
with open(args.connection_file) as cf:
|
|
pg_conn = cf.read()
|
|
|
|
t.start('Create database schema')
|
|
conn = pg.connect(pg_conn)
|
|
with conn.cursor() as curs:
|
|
curs.execute('CREATE SCHEMA IF NOT EXISTS %s', (AsIs(args.schema_name),))
|
|
curs.execute("""
|
|
CREATE TABLE IF NOT EXISTS %(schema)s.region (
|
|
id_region serial PRIMARY KEY,
|
|
reg VARCHAR(2) unique,
|
|
ncc VARCHAR(200),
|
|
libelle VARCHAR(200)
|
|
);""",
|
|
{'schema':AsIs(args.schema_name)})
|
|
|
|
curs.execute("""
|
|
CREATE TABLE IF NOT EXISTS %(schema)s.departement (
|
|
id_departement serial PRIMARY KEY,
|
|
dep VARCHAR(3) unique,
|
|
ncc VARCHAR(200),
|
|
libelle VARCHAR(200),
|
|
reg VARCHAR(2) REFERENCES %(schema)s.region(reg)
|
|
);""",
|
|
{'schema':AsIs(args.schema_name)})
|
|
|
|
curs.execute("""
|
|
CREATE TABLE IF NOT EXISTS %(schema)s.commune (
|
|
id_commune serial PRIMARY KEY,
|
|
com VARCHAR(5) unique,
|
|
ncc VARCHAR(200),
|
|
libelle VARCHAR(200),
|
|
dep VARCHAR(3) REFERENCES %(schema)s.departement(dep),
|
|
superf FLOAT CONSTRAINT SUPERFICIE_POSITIVE CHECK (superf > 0)
|
|
);""",
|
|
{'schema':AsIs(args.schema_name)})
|
|
|
|
curs.execute("""
|
|
CREATE TABLE IF NOT EXISTS %(schema)s.cheflieudepartement(
|
|
dep VARCHAR(3) REFERENCES %(schema)s.departement(dep),
|
|
cheflieudep VARCHAR(5) REFERENCES %(schema)s.commune(com),
|
|
PRIMARY KEY (dep, cheflieudep)
|
|
);""",
|
|
{'schema':AsIs(args.schema_name)})
|
|
|
|
curs.execute("""
|
|
CREATE TABLE IF NOT EXISTS %(schema)s.cheflieuregion(
|
|
reg VARCHAR(3) REFERENCES %(schema)s.region(reg),
|
|
cheflieureg VARCHAR(5) REFERENCES %(schema)s.commune(com),
|
|
PRIMARY KEY (reg, cheflieureg)
|
|
);""",
|
|
{'schema':AsIs(args.schema_name)})
|
|
|
|
curs.execute("""
|
|
CREATE TABLE IF NOT EXISTS %(schema)s.indicateur(
|
|
id_indicateur INTEGER PRIMARY KEY,
|
|
libelle VARCHAR(200)
|
|
);""",
|
|
{'schema':AsIs(args.schema_name)})
|
|
|
|
curs.execute("""
|
|
CREATE TABLE IF NOT EXISTS %(schema)s.statistique(
|
|
com VARCHAR(5) REFERENCES %(schema)s.commune(com),
|
|
id_indicateur INTEGER REFERENCES %(schema)s.indicateur(id_indicateur),
|
|
date_debut SMALLINT,
|
|
date_fin SMALLINT CONSTRAINT APRES_DATEDEBUT CHECK (date_fin > date_debut),
|
|
valeur float,
|
|
PRIMARY KEY (com,id_indicateur,date_debut)
|
|
);""",
|
|
{'schema':AsIs(args.schema_name)})
|
|
t.stop()
|
|
|
|
t.start('Import data from csv files')
|
|
with conn.cursor() as curs:
|
|
curs.execute('SET search_path TO {}'.format(args.schema_name))
|
|
csvs = {
|
|
'region': {'col': ('reg', 'ncc', 'libelle'), 'null':''},
|
|
'departement': {'col': ('dep', 'ncc', 'libelle', 'reg'), 'null': ''},
|
|
'commune': {'col': ('com', 'ncc', 'libelle', 'dep', 'superf'), 'null':''},
|
|
'cheflieudepartement': {'col': ('cheflieudep', 'dep'), 'null':''},
|
|
'cheflieuregion': {'col': ('cheflieureg', 'reg'), 'null':''},
|
|
'indicateur': {'col': ('id_indicateur', 'libelle'), 'null':''},
|
|
'statistique': {
|
|
'col': (
|
|
'com',
|
|
'id_indicateur',
|
|
'date_debut',
|
|
'date_fin',
|
|
'valeur'
|
|
),
|
|
'null':'null'
|
|
}
|
|
}
|
|
for csv, params in csvs.items():
|
|
logging.debug('Process csv file : {} with {}'.format(csv, params))
|
|
with open( args.source + '/' + csv + '.csv') as f:
|
|
with conn.cursor() as curs:
|
|
curs.copy_from(
|
|
f,
|
|
csv,
|
|
sep=',',
|
|
columns=params['col'],
|
|
null=params['null']
|
|
)
|
|
conn.commit()
|
|
t.stop()
|
|
|
|
conn.close()
|