#!/usr/bin/env python import sys import os import re # import time import logging import argparse as arg import psycopg2 as pg from psycopg2.extensions import AsIs from classes.CustomFormater import CustomFormatter from classes.Timer import Timer import locale locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8') # Schema name is NAMEDATALEN-1 (PGSQL source code) # -> src/include/pg_config_manual.h def check_schema_name(arg_value, pat=re.compile(r"^[a-z0-9A-Z]{1,63}$")): if not pat.match(arg_value): raise ValueError return arg_value def parse_args(): """ Parse arguments """ parser = arg.ArgumentParser('Process csv files from INSEE') parser.add_argument('--source', '-s', help='csv source directory', default='exports') parser.add_argument('--connection-file', '-f', help='Postgresql connexion file', default='.pgconn' ) # As whe use AsIs function to not include '' in our sql queries for schema # name, me mus ensure that it is not contains SQL special characters # like comments --, final coma etc and avoid SQL injection parser.add_argument('--schema-name', help='Database schema name', type=check_schema_name, default='insee' ) parser.add_argument('--test', '-t', help='test view, get 10 record of each', action='store_true' ) debug_group = parser.add_mutually_exclusive_group() debug_group.add_argument('--verbose', '-V', help='Verbose output', action='store_true') debug_group.add_argument('--debug', '-d', help='Activate debug mode', action='store_true') return parser.parse_args() if __name__ == '__main__': args = parse_args() #logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger() tty_handler = logging.StreamHandler() # create console handler with a higher log level tty_handler.setFormatter(CustomFormatter()) logger.addHandler(tty_handler) if args.verbose is True: logger.setLevel(logging.INFO) logger.info('VERBOSE mode activated') if args.debug is True: logger.setLevel(logging.DEBUG) logger.debug('DEBUG mode activated') logging.debug('Import pgsql connection file {}'.format(args.connection_file)) with open(args.connection_file) as cf: pg_conn = cf.read() t = Timer(logger=logger.info) conn = pg.connect(pg_conn) with conn.cursor() as curs: logger.debug('') t.start('Create view maj_pop') try: curs.execute(""" CREATE OR REPLACE VIEW %(schema)s.VIEW_MAJ_POP AS SELECT id_departement, com, RANG, valeur, date_debut FROM ( SELECT id_departement, d.dep, c.com, d.ncc, s.ID_INDICATEUR, RANK() OVER(PARTITION BY id_departement ORDER BY date_debut DESC) RANG, SUM(s.valeur) valeur, date_debut FROM %(schema)s.departement d INNER JOIN %(schema)s.commune c ON c.dep = d.dep INNER JOIN %(schema)s.statistique s ON s.com = c.com WHERE id_indicateur = 1 GROUP BY id_departement, d.dep, s.date_debut, d.ncc, s.id_indicateur, c.com ORDER BY id_departement, id_indicateur, c.com) TB WHERE RANG = 1; """, {'schema':AsIs(args.schema_name)}) conn.commit() except Exception as e: logger.error('Something goes wrong on create view maj_pop: {}'.format(e)) sys.exit(1) t.stop() logger.debug('Add procedure to modify pop. values on region and dep') t.start('Add fun_maj_pop() procedure') try: curs.execute(""" CREATE OR REPLACE PROCEDURE %(schema)s.PRC_POP_REG_DEP_2() LANGUAGE plpgsql AS $$ DECLARE REC RECORD; V_POP INTEGER; V_DEP INTEGER; V_REG INTEGER; BEGIN ALTER TABLE %(schema)s.region DISABLE TRIGGER TRG_BLOQ_MAJ_REG; ALTER TABLE %(schema)s.departement DISABLE TRIGGER TRG_BLOQ_MAJ_DEP; UPDATE %(schema)s.departement SET population = 0; UPDATE %(schema)s.region SET population = 0; FOR REC IN (SELECT id_departement, valeur,date_debut FROM (SELECT id_departement,NCC,SUM valeur, date_debut, RANK() OVER(PARTITION BY id_departement ORDER BY date_debut DESC) RANG FROM %(schema)s.VIEW_INDICATEUR_DEP v WHERE id_indicateur = 1) TB WHERE RANG = 1) LOOP SELECT COUNT(COM),id_departement INTO V_POP,V_DEP FROM %(schema)s.VIEW_MAJ_POP WHERE id_departement = REC.id_departement AND valeur = 0 OR valeur IS NULL GROUP BY id_departement; IF NOT FOUND THEN V_DEP = 999; END IF; UPDATE %(schema)s.departement SET population = REC.valeur WHERE id_departement= REC.id_departement AND id_departement != V_DEP; END LOOP; FOR REC IN (SELECT id_region,SUM(d.population) valeur FROM %(schema)s.departement d INNER JOIN %(schema)s.region r ON r.REG = d.REG GROUP BY id_region) LOOP SELECT id_region,COUNT(d.DEP) INTO V_REG,V_POP FROM %(schema)s.departement d INNER JOIN %(schema)s.region r ON r.REG = d.REG WHERE id_region = REC.id_region AND d.population = 0 OR d.population IS NULL GROUP BY id_region; IF NOT FOUND THEN V_REG = 99; END IF; UPDATE %(schema)s.region SET population = REC.valeur WHERE id_region = REC.id_region AND id_region != V_REG; END LOOP; ALTER TABLE %(schema)s.region ENABLE TRIGGER TRG_BLOQ_MAJ_REG; ALTER TABLE %(schema)s.departement ENABLE TRIGGER TRG_BLOQ_MAJ_DEP; END; $$; """, {'schema':AsIs(args.schema_name)}) conn.commit() except Exception as e: logger.error( 'Something goes wrong on fun_maj_pop procedure: {}'.format(e) ) sys.exit(1) t.stop() logger.debug('Create procedure fun_maj_pop_2') t.start('Add fun_maj_pop_2()') try: curs.execute(""" CREATE OR REPLACE FUNCTION %(schema)s.FUN_MAJ_POP_2() RETURNS TRIGGER AS $maj_population_2$ BEGIN ALTER TABLE %(schema)s.REGION DISABLE TRIGGER TRG_BLOQ_MAJ_REG; ALTER TABLE %(schema)s.DEPARTEMENT DISABLE TRIGGER TRG_BLOQ_MAJ_DEP; IF(TG_OP = 'INSERT' AND NEW.ID_INDICATEUR = 1 AND (NEW.VALEUR <> 0 OR NEW.VALEUR <> NULL)) THEN CALL %(schema)s.PRC_POP_REG_DEP_2(); END IF; IF(TG_OP = 'UPDATE' AND NEW.ID_INDICATEUR = 1 AND NEW.VALEUR <> OLD.VALEUR) THEN CALL %(schema)s.PRC_POP_REG_DEP_2(); END IF; IF(TG_OP = 'DELETE' AND OLD.ID_INDICATEUR = 1 AND (OLD.VALEUR <> 0 OR OLD.VALEUR <> NULL)) THEN CALL %(schema)s.PRC_POP_REG_DEP_2(); END IF; ALTER TABLE %(schema)s.REGION ENABLE TRIGGER TRG_BLOQ_MAJ_REG; ALTER TABLE %(schema)s.DEPARTEMENT ENABLE TRIGGER TRG_BLOQ_MAJ_DEP; RETURN NULL; END; $maj_population_2$ language plpgsql; """, {'schema':AsIs(args.schema_name)}) conn.commit() except Exception as e: logger.error( 'Something goes wrong on create fun_maj_pop_2: {}'.format(e) ) sys.exit(1) t.stop() logger.debug('Create trigger maj_pop_2') t.start('Add trigger maj_pop_2') try: curs.execute(""" CREATE TRIGGER TRG_MAJ_POP_2 AFTER INSERT OR UPDATE OR DELETE ON %(schema)s.STATISTIQUE FOR EACH ROW EXECUTE PROCEDURE %(schema)s.FUN_MAJ_POP_2(); """, {'schema':AsIs(args.schema_name)}) conn.commit() except Exception as e: logger.error( 'Something goes wrong on create fun_maj_pop_2: {}'.format(e) ) sys.exit(1) t.stop()