#!/usr/bin/env python import sys import os import re # import time import logging import argparse as arg import psycopg2 as pg from psycopg2.extensions import AsIs from classes.CustomFormater import CustomFormatter from classes.Timer import Timer # Schema name is NAMEDATALEN-1 (PGSQL source code) # -> src/include/pg_config_manual.h def check_schema_name(arg_value, pat=re.compile(r"^[a-z0-9A-Z]{1,63}$")): if not pat.match(arg_value): raise ValueError return arg_value def parse_args(): """ Parse arguments """ parser = arg.ArgumentParser('Process csv files from INSEE') parser.add_argument('--source', '-s', help='csv source directory', default='exports') parser.add_argument('--connection-file', '-f', help='Postgresql connexion file', default='.pgconn' ) # As whe use AsIs function to not include '' in our sql queries for schema # name, me mus ensure that it is not contains SQL special characters # like comments --, final coma etc and avoid SQL injection parser.add_argument('--schema-name', help='Database schema name', type=check_schema_name, default='insee' ) debug_group = parser.add_mutually_exclusive_group() debug_group.add_argument('--verbose', '-V', help='Verbose output', action='store_true') debug_group.add_argument('--debug', '-d', help='Activate debug mode', action='store_true') return parser.parse_args() if __name__ == '__main__': args = parse_args() #logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger() tty_handler = logging.StreamHandler() # create console handler with a higher log level tty_handler.setFormatter(CustomFormatter()) logger.addHandler(tty_handler) if args.verbose is True: logger.setLevel(logging.INFO) logger.info('VERBOSE mode activated') if args.debug is True: logger.setLevel(logging.DEBUG) logger.debug('DEBUG mode activated') t = Timer(logger=logger.info) logging.debug('Import pgsql connection file {}'.format(args.connection_file)) with open(args.connection_file) as cf: pg_conn = cf.read() t.start('Create database schema') conn = pg.connect(pg_conn) with conn.cursor() as curs: curs.execute('CREATE SCHEMA IF NOT EXISTS %s', (AsIs(args.schema_name),)) curs.execute(""" CREATE TABLE IF NOT EXISTS %(schema)s.region ( id_region serial PRIMARY KEY, reg VARCHAR(2) unique, ncc VARCHAR(200), libelle VARCHAR(200) );""", {'schema':AsIs(args.schema_name)}) curs.execute(""" CREATE TABLE IF NOT EXISTS %(schema)s.departement ( id_departement serial PRIMARY KEY, dep VARCHAR(3) unique, ncc VARCHAR(200), libelle VARCHAR(200), reg VARCHAR(2) REFERENCES %(schema)s.region(reg) );""", {'schema':AsIs(args.schema_name)}) curs.execute(""" CREATE TABLE IF NOT EXISTS %(schema)s.commune ( id_commune serial PRIMARY KEY, com VARCHAR(5) unique, ncc VARCHAR(200), libelle VARCHAR(200), dep VARCHAR(3) REFERENCES %(schema)s.departement(dep), superf FLOAT CONSTRAINT SUPERFICIE_POSITIVE CHECK (superf > 0) );""", {'schema':AsIs(args.schema_name)}) curs.execute(""" CREATE TABLE IF NOT EXISTS %(schema)s.cheflieudepartement( dep VARCHAR(3) REFERENCES %(schema)s.departement(dep), cheflieudep VARCHAR(5) REFERENCES %(schema)s.commune(com), PRIMARY KEY (dep, cheflieudep) );""", {'schema':AsIs(args.schema_name)}) curs.execute(""" CREATE TABLE IF NOT EXISTS %(schema)s.cheflieuregion( reg VARCHAR(3) REFERENCES %(schema)s.region(reg), cheflieureg VARCHAR(5) REFERENCES %(schema)s.commune(com), PRIMARY KEY (reg, cheflieureg) );""", {'schema':AsIs(args.schema_name)}) curs.execute(""" CREATE TABLE IF NOT EXISTS %(schema)s.indicateur( id_indicateur INTEGER PRIMARY KEY, libelle VARCHAR(200) );""", {'schema':AsIs(args.schema_name)}) curs.execute(""" CREATE TABLE IF NOT EXISTS %(schema)s.statistique( com VARCHAR(5) REFERENCES %(schema)s.commune(com), id_indicateur INTEGER REFERENCES %(schema)s.indicateur(id_indicateur), date_debut SMALLINT, date_fin SMALLINT CONSTRAINT APRES_DATEDEBUT CHECK (date_fin > date_debut), valeur float, PRIMARY KEY (com,id_indicateur,date_debut) );""", {'schema':AsIs(args.schema_name)}) t.stop() t.start('Import data from csv files') with conn.cursor() as curs: curs.execute('SET search_path TO {}'.format(args.schema_name)) csvs = { 'region': {'col': ('reg', 'ncc', 'libelle'), 'null':''}, 'departement': {'col': ('dep', 'ncc', 'libelle', 'reg'), 'null': ''}, 'commune': {'col': ('com', 'ncc', 'libelle', 'dep', 'superf'), 'null':''}, 'cheflieudepartement': {'col': ('cheflieudep', 'dep'), 'null':''}, 'cheflieuregion': {'col': ('cheflieureg', 'reg'), 'null':''}, 'indicateur': {'col': ('id_indicateur', 'libelle'), 'null':''}, 'statistique': { 'col': ( 'com', 'id_indicateur', 'date_debut', 'date_fin', 'valeur' ), 'null':'null' } } for csv, params in csvs.items(): logging.debug('Process csv file : {} with {}'.format(csv, params)) with open( args.source + '/' + csv + '.csv') as f: with conn.cursor() as curs: curs.copy_from( f, csv, sep=',', columns=params['col'], null=params['null'] ) conn.commit() t.stop() conn.close()