#!/usr/bin/env python import sys import os import re # import time import logging import argparse as arg import psycopg2 as pg from psycopg2.extensions import AsIs from classes.CustomFormater import CustomFormatter from classes.Timer import Timer import locale locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8') # Schema name is NAMEDATALEN-1 (PGSQL source code) # -> src/include/pg_config_manual.h def check_schema_name(arg_value, pat=re.compile(r"^[a-z0-9A-Z]{1,63}$")): if not pat.match(arg_value): raise ValueError return arg_value def parse_args(): """ Parse arguments """ parser = arg.ArgumentParser('Process csv files from INSEE') parser.add_argument('--source', '-s', help='csv source directory', default='exports') parser.add_argument('--connection-file', '-f', help='Postgresql connexion file', default='.pgconn' ) # As whe use AsIs function to not include '' in our sql queries for schema # name, me mus ensure that it is not contains SQL special characters # like comments --, final coma etc and avoid SQL injection parser.add_argument('--schema-name', help='Database schema name', type=check_schema_name, default='insee' ) parser.add_argument('--call', '-c', help='test view, get 10 record of each', action='store_true' ) debug_group = parser.add_mutually_exclusive_group() debug_group.add_argument('--verbose', '-V', help='Verbose output', action='store_true') debug_group.add_argument('--debug', '-d', help='Activate debug mode', action='store_true') return parser.parse_args() if __name__ == '__main__': args = parse_args() #logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger() tty_handler = logging.StreamHandler() # create console handler with a higher log level tty_handler.setFormatter(CustomFormatter()) logger.addHandler(tty_handler) if args.verbose is True: logger.setLevel(logging.INFO) logger.info('VERBOSE mode activated') if args.debug is True: logger.setLevel(logging.DEBUG) logger.debug('DEBUG mode activated') logging.debug('Import pgsql connection file {}'.format(args.connection_file)) with open(args.connection_file) as cf: pg_conn = cf.read() t = Timer(logger=logger.info) conn = pg.connect(pg_conn) with conn.cursor() as curs: logger.debug('Add colums in tables region and departement') t.start('Update tables') try: curs.execute(""" ALTER TABLE %(schema)s.region ADD COLUMN IF NOT EXISTS population INT; """, {'schema':AsIs(args.schema_name)}) curs.execute(""" ALTER TABLE %(schema)s.departement ADD COLUMN IF NOT EXISTS population INT; """, {'schema':AsIs(args.schema_name)}) conn.commit() except Exception as e: logger.error('Something goes wrong updating tables: {}'.format( e )) t.stop() logging.debug('Add procedure.') t.start('Add procedure') try: curs.execute(""" CREATE OR REPLACE PROCEDURE %(schema)s.PRC_POP_REG_DEP() LANGUAGE plpgsql AS $$ DECLARE REC RECORD; BEGIN FOR REC IN (SELECT id_departement, ncc, SUM valeur FROM %(schema)s.view_indicateur_dep v WHERE id_indicateur = 1 AND date_debut = '2018') LOOP UPDATE %(schema)s.departement SET population = REC.valeur WHERE id_departement = REC.id_departement; END LOOP; FOR REC IN (SELECT reg, SUM(population) valeur FROM %(schema)s.departement d GROUP BY reg) LOOP UPDATE %(schema)s.region SET population = REC.valeur WHERE reg = REC.reg; END LOOP; END; $$;""", {'schema':AsIs(args.schema_name)}) conn.commit() except Exception as e: logger.error('Something goes wrong updating tables: {}'.format( e )) t.stop() if args.call: t.start('Call procedure') with conn.cursor() as curs: try: curs.execute("CALL %(schema)s.PRC_POP_REG_DEP()", {'schema':AsIs(args.schema_name)} ) conn.commit() except Exception as e: logger.error( 'something goes wrong on procedure call: {}'.format(e)) t.stop()