package org.eclipse.scada.base.pipe.internal;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.scada.base.pipe.PipeService;
import org.eclipse.scada.base.pipe.Producer;
import org.eclipse.scada.base.pipe.Worker;
import org.eclipse.scada.base.pipe.WorkerAlreadyCreated;
import org.eclipse.scada.base.pipe.WorkerHandle;
import org.eclipse.scada.utils.io.RecursiveDeleteVisitor;
import org.eclipse.scada.utils.str.Tables;
import org.osgi.framework.FrameworkUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/scada/base/pipe/internal/PipeServiceImpl.class */
public class PipeServiceImpl implements PipeService {
    private final File storage;
    private final AtomicLong counter;
    private final Map<String, WorkerThread> workers;
    private boolean started;
    private WorkerHandle testWorker;
    private static final Logger logger = LoggerFactory.getLogger(PipeServiceImpl.class);
    private static final Pattern FILE_PATTERN = Pattern.compile("([0-9a-z]+)-([0-9a-z]+)-([0-9a-z]+)\\.evt", 2);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/eclipse/scada/base/pipe/internal/PipeServiceImpl$MetaInfo.class */
    public static class MetaInfo {
        long timestamp;
        long retry;

        public MetaInfo(long j, long j2) {
            this.timestamp = j;
            this.retry = j2;
        }
    }

    public PipeServiceImpl() {
        this(getDefaultDirectory());
    }

    public PipeServiceImpl(File file) {
        this.counter = new AtomicLong();
        this.workers = new HashMap();
        if (!file.isDirectory()) {
            throw new IllegalStateException(String.format("'%s' is not a directory", file));
        }
        this.storage = file;
    }

    public synchronized void start() {
        if (this.started) {
            return;
        }
        this.started = true;
    }

    public synchronized void stop() {
        if (this.started) {
            this.started = false;
            Iterator<WorkerThread> it = this.workers.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.workers.clear();
        }
    }

    public static File getDefaultDirectory() {
        File file;
        String property = System.getProperty("org.eclipse.scada.base.pipe.storage", null);
        if (property == null) {
            File dataFile = FrameworkUtil.getBundle(PipeServiceImpl.class).getDataFile((String) null);
            if (dataFile == null) {
                throw new IllegalStateException("Unable to get root folder of bundle data directory");
            }
            file = new File(dataFile, "storage");
        } else {
            file = new File(property);
        }
        if (!file.exists()) {
            file.mkdirs();
        }
        if (file.isDirectory()) {
            return file;
        }
        throw new IllegalStateException(String.format("'%s' is not a valid directory or could not be created", file));
    }

    @Override // org.eclipse.scada.base.pipe.PipeService
    public Producer createProducer(final String str) {
        return new Producer() { // from class: org.eclipse.scada.base.pipe.internal.PipeServiceImpl.1
            @Override // org.eclipse.scada.base.pipe.Producer
            public void publish(byte[] bArr, int i) throws IOException {
                PipeServiceImpl.this.publishEvent(str, bArr, i);
            }
        };
    }

    protected void publishEvent(String str, byte[] bArr, int i) throws IOException {
        File makeFile = makeFile(encode(str), new MetaInfo(0L, i));
        logger.trace("Block file: {}", makeFile);
        if (!makeFile.createNewFile()) {
            throw new IllegalStateException(String.format("File '%s' already exists", makeFile));
        }
        File file = new File(String.valueOf(makeFile.getAbsolutePath()) + ".part");
        logger.trace("Part file: {}", file);
        try {
            Files.write(file.toPath(), bArr, StandardOpenOption.CREATE_NEW);
            Files.move(file.toPath(), makeFile.toPath(), StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
            logger.trace("Created data file: {} bytes", Integer.valueOf(bArr.length));
            notifyConsumer(str);
        } finally {
            if (file.exists()) {
                logger.trace("Delete part file: {}", file);
                file.delete();
            }
        }
    }

    private synchronized void notifyConsumer(String str) {
        WorkerThread workerThread = this.workers.get(str);
        if (workerThread != null) {
            logger.debug("Notify consumer thread: {} -> {}", str, workerThread);
            workerThread.notifyNewEvent();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public File makeFile(String str, MetaInfo metaInfo) {
        return new File(getQueueDir(str), String.format("%016x-%016x-%08x.evt", Long.valueOf((metaInfo == null || metaInfo.timestamp <= 0) ? System.currentTimeMillis() : metaInfo.timestamp), Long.valueOf(this.counter.incrementAndGet()), Long.valueOf(metaInfo == null ? 0L : metaInfo.retry)));
    }

    private File getQueueDir(String str) {
        File file = new File(this.storage, str);
        if (!file.exists()) {
            file.mkdir();
        }
        if (file.isDirectory()) {
            return file;
        }
        throw new IllegalStateException(String.format("Unable to create queue dir: '%s'", file));
    }

    private String encode(String str) {
        try {
            return URLEncoder.encode(str, "UTF-8");
        } catch (UnsupportedEncodingException unused) {
            return str;
        }
    }

    @Override // org.eclipse.scada.base.pipe.PipeService
    public synchronized WorkerHandle createWorker(final String str, Worker worker) throws WorkerAlreadyCreated {
        if (this.workers.containsKey(str)) {
            throw new WorkerAlreadyCreated(String.format("The consumer '%s' has already been created", str));
        }
        WorkerHandle workerHandle = new WorkerHandle() { // from class: org.eclipse.scada.base.pipe.internal.PipeServiceImpl.2
            @Override // org.eclipse.scada.base.pipe.WorkerHandle
            public void close() {
                PipeServiceImpl.this.performClose(str);
            }
        };
        WorkerThread workerThread = new WorkerThread(this, worker, str);
        workerThread.setName("PipeWorkerThread/" + str);
        this.workers.put(str, workerThread);
        workerThread.start();
        return workerHandle;
    }

    public Long fetchNextEvents(String str, int i, List<File> list) {
        File[] listFiles = getQueueDir(str).listFiles();
        Arrays.sort(listFiles);
        long currentTimeMillis = System.currentTimeMillis();
        Long l = null;
        for (File file : listFiles) {
            if (file.isFile() && file.canRead()) {
                if (file.getName().endsWith(".evt") && file.length() > 0) {
                    logger.trace("Preparing: {}", file);
                    MetaInfo parse = parse(file.getName());
                    if (parse == null) {
                        logger.info("Broken file name: {}", file.getName());
                    } else {
                        long j = parse.timestamp;
                        if (j > currentTimeMillis) {
                            if (l == null || l.longValue() > j) {
                                if (logger.isTraceEnabled()) {
                                    logger.trace("Setting end time to {} (in {} ms)", Long.valueOf(j), Long.valueOf(j - currentTimeMillis));
                                }
                                l = Long.valueOf(j);
                            }
                            logger.debug("Postponed item: {}", file.getName());
                        } else {
                            list.add(file);
                        }
                    }
                }
                if (i > 0 && list.size() > i) {
                    return l;
                }
            }
        }
        return l;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static MetaInfo parse(String str) {
        Matcher matcher = FILE_PATTERN.matcher(str);
        if (matcher.matches()) {
            return new MetaInfo(Long.parseLong(matcher.group(1), 16), Long.parseLong(matcher.group(3), 16));
        }
        return null;
    }

    public void processNextEvent(Worker worker, String str) throws IOException {
        File[] listFiles = getQueueDir(str).listFiles();
        Arrays.sort(listFiles);
        for (File file : listFiles) {
            if (file.isFile() && file.canRead() && file.getName().endsWith(".evt") && file.length() > 0) {
                logger.trace("Processing: {}", file);
                try {
                    worker.work(Files.readAllBytes(file.toPath()));
                } catch (Exception e) {
                    logger.info("Worker failed", e);
                }
                Files.delete(file.toPath());
            }
        }
    }

    protected synchronized void performClose(String str) {
        WorkerThread remove = this.workers.remove(str);
        if (remove != null) {
            remove.close();
        }
    }

    public void pipes() {
        for (File file : this.storage.listFiles()) {
            if (file.isDirectory()) {
                System.out.println(decode(file.getName()));
            }
        }
    }

    public void list(String str) {
        MetaInfo parse;
        File queueDir = getQueueDir(str);
        LinkedList linkedList = new LinkedList();
        File[] listFiles = queueDir.listFiles();
        Arrays.sort(listFiles);
        for (File file : listFiles) {
            if (file.isFile() && file.length() > 0 && (parse = parse(file.getName())) != null) {
                LinkedList linkedList2 = new LinkedList();
                linkedList2.add(file.getName());
                linkedList2.add(new StringBuilder().append(file.length()).toString());
                linkedList2.add(String.format("%tc", new Date(file.lastModified())));
                linkedList2.add(String.format("%tc", new Date(parse.timestamp)));
                linkedList2.add(new StringBuilder().append(parse.retry).toString());
                linkedList.add(linkedList2);
            }
        }
        Tables.showTable(System.out, Arrays.asList("Name", "Size", "Entry Date", "Schedule", "Retry"), linkedList, 3);
    }

    private String decode(String str) {
        try {
            return URLDecoder.decode(str, "UTF-8");
        } catch (UnsupportedEncodingException unused) {
            return str;
        }
    }

    public void startTestWorker(String str) throws WorkerAlreadyCreated {
        final PrintStream printStream = System.out;
        this.testWorker = createWorker(str, new Worker() { // from class: org.eclipse.scada.base.pipe.internal.PipeServiceImpl.3
            @Override // org.eclipse.scada.base.pipe.Worker
            public void work(byte[] bArr) {
                printStream.format("Pipe event - size: %s, data: %s%n", Integer.valueOf(bArr.length), bArr);
                printStream.flush();
                String charBuffer = StandardCharsets.UTF_8.decode(ByteBuffer.wrap(bArr)).toString();
                printStream.format("\tString: %s%n", charBuffer);
                printStream.flush();
                String[] split = charBuffer.split(" ");
                if (split.length <= 1 || !split[0].equals("sleep")) {
                    if (split.length > 0 && split[0].equals("error")) {
                        throw new RuntimeException("Test Exception");
                    }
                    return;
                }
                try {
                    printStream.println("Sleeping...");
                    printStream.flush();
                    Thread.sleep(Integer.parseInt(split[1]) * 1000);
                    printStream.println("Sleeping... done!");
                    printStream.flush();
                } catch (InterruptedException | NumberFormatException e) {
                    e.printStackTrace(printStream);
                }
            }
        });
    }

    public void closeTestWorker() {
        if (this.testWorker != null) {
            this.testWorker.close();
        }
    }

    public void testPublish(String str, String str2) throws IOException {
        publishEvent(str, StandardCharsets.UTF_8.encode(str2).array(), 2);
    }

    public void drop(String str) throws IOException {
        File file;
        File queueDir = getQueueDir(str);
        int i = 0;
        do {
            file = new File(String.valueOf(queueDir.getName()) + "-" + i);
            i++;
        } while (file.exists());
        Files.move(queueDir.toPath(), file.toPath(), StandardCopyOption.ATOMIC_MOVE);
        Files.walkFileTree(file.toPath(), new RecursiveDeleteVisitor());
    }
}
