#!/usr/bin/env python import sys import os import re # import time import logging import argparse as arg import psycopg2 as pg from psycopg2.extensions import AsIs from classes.CustomFormater import CustomFormatter from classes.Timer import Timer import locale locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8') # Schema name is NAMEDATALEN-1 (PGSQL source code) # -> src/include/pg_config_manual.h def check_schema_name(arg_value, pat=re.compile(r"^[a-z0-9A-Z]{1,63}$")): if not pat.match(arg_value): raise ValueError return arg_value def parse_args(): """ Parse arguments """ parser = arg.ArgumentParser('Process csv files from INSEE') parser.add_argument('--source', '-s', help='csv source directory', default='exports') parser.add_argument('--connection-file', '-f', help='Postgresql connexion file', default='.pgconn' ) # As whe use AsIs function to not include '' in our sql queries for schema # name, me mus ensure that it is not contains SQL special characters # like comments --, final coma etc and avoid SQL injection parser.add_argument('--schema-name', help='Database schema name', type=check_schema_name, default='insee' ) parser.add_argument('--test', '-t', help='test view, get 10 record of each', action='store_true' ) debug_group = parser.add_mutually_exclusive_group() debug_group.add_argument('--verbose', '-V', help='Verbose output', action='store_true') debug_group.add_argument('--debug', '-d', help='Activate debug mode', action='store_true') return parser.parse_args() if __name__ == '__main__': args = parse_args() #logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger() tty_handler = logging.StreamHandler() # create console handler with a higher log level tty_handler.setFormatter(CustomFormatter()) logger.addHandler(tty_handler) if args.verbose is True: logger.setLevel(logging.INFO) logger.info('VERBOSE mode activated') if args.debug is True: logger.setLevel(logging.DEBUG) logger.debug('DEBUG mode activated') t = Timer(logger=logger.info) logging.debug('Import pgsql connection file {}'.format(args.connection_file)) with open(args.connection_file) as cf: pg_conn = cf.read() t.start('Add Views') conn = pg.connect(pg_conn) with conn.cursor() as curs: logger.debug('Add view_indicateur_reg view') try: curs.execute(""" CREATE OR REPLACE VIEW %(schema)s.view_indicateur_dep AS SELECT id_departement, d.dep, d.ncc, s.id_indicateur, s.date_debut, SUM(s.VALEUR) FROM %(schema)s.departement d INNER JOIN %(schema)s.commune c ON c.dep = d.dep INNER JOIN %(schema)s.statistique s ON s.com = c.com GROUP BY id_departement,d.dep, s.date_debut, d.ncc, s.id_indicateur ORDER BY id_indicateur;""", {'schema':AsIs(args.schema_name)}) conn.commit() except Exception as e: logger.error('Can\'t add view view_indicateur_dep: {}'.format(e)) sys.exit(1) logger.debug('Add view_indicateur_reg view') try: curs.execute(""" CREATE OR REPLACE VIEW %(schema)s.view_indicateur_reg AS SELECT id_region, r.reg, r.ncc, s.id_indicateur, s.date_debut, SUM(s.valeur) FROM %(schema)s.region r INNER JOIN %(schema)s.departement d ON d.reg = r.reg INNER JOIN %(schema)s.commune c ON c.dep = d.dep INNER JOIN %(schema)s.statistique s ON s.com = c.com GROUP BY id_region, r.reg, s.date_debut, r.ncc, s.id_indicateur ORDER BY id_indicateur;""", {'schema':AsIs(args.schema_name)}) conn.commit() except Exception as e: logger.error('Can\'t add view view_indicateur_reg: {}'.format(e)) sys.exit(1) t.stop() if args.test: for view in ('view_indicateur_dep','view_indicateur_reg'): curs.execute(""" SELECT v.ncc, v.date_debut, v.sum FROM %(schema)s.%(view)s v LIMIT 10; """, {'schema':AsIs(args.schema_name), 'view': AsIs(view)}) data = curs.fetchall() print('\nCheck {}:'.format(view)) for row in data: print('\t{:.<40}{}: {:>7n}'.format(*row))