All files / src/hooks useLiveStream.ts

93.02% Statements 40/43
71.87% Branches 23/32
100% Functions 9/9
100% Lines 38/38

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116                            4x                                                                       3x 3x 3x 3x 3x       3x     3x           3x 3x       3x 3x   3x 3x 2x   2x 3x   3x 4x 4x 4x 4x 4x     3x 3x   3x 2x 2x 2x   2x 1x 1x 1x 1x 1x 1x   1x         3x        
// --------------------------------------------------------------------
// src/hooks/useLiveStream.ts
// --------------------------------------------------------------------
// Change: refactor from "always-on hook" to a conditional child component
//         (LiveStreamDriver) that only mounts when live streaming is enabled.
//         This guarantees no subscription is created when live data is OFF.
//         Also: rename `limit` ➜ `maxPoints` for consistency with history.
// --------------------------------------------------------------------
import React, { useEffect, useMemo, useRef } from 'react';
import { useStream } from '@ska-octopus-widget-sdk/widget-sdk';
import { OP_STREAM } from '../graphql/stream';
import type { Pt } from './useInitialHistory';
 
function canonicalAttr(name: string): string {
  return name.replace(/^tango:\/\/[^/]+\/(.+)$/, '$1');
}
 
export type StreamStatus = 'idle' | 'loading' | 'ok' | 'failed';
export type StreamMeta = {
  status: StreamStatus;
  error?: any;
  retry?: () => void;
};
 
export interface LiveStreamDriverProps {
  /** Full attribute names, may include tango://… prefix */
  attrs: string[];
  /** Max points kept per series */
  maxPoints: number;
  /** Parent's data-map setter (in-place append with cap) */
  setDataMap: React.Dispatch<React.SetStateAction<Record<string, Pt[]>>>;
  /** Throttling (seconds). 0 = append every packet */
  appendIntervalSec: number;
  /** Report status/errors/retry back to parent */
  onStatus?: (meta: StreamMeta) => void;
}
 
/**
 * LiveStreamDriver
 * Mount this component ONLY when you want the live subscription on.
 * Unmounting it will stop the subscription immediately.
 */
export function LiveStreamDriver({
  attrs,
  maxPoints,
  setDataMap,
  appendIntervalSec,
  onStatus
}: LiveStreamDriverProps) {
  // Stable, de-duped, sorted list to keep subscription identity stable.
  const fullNames = useMemo<string[]>(() => {
    const arr = Array.isArray(attrs) ? attrs : [];
    const unique = Array.from(new Set(arr.filter(Boolean)));
    unique.sort();
    return unique;
  }, [attrs]);
 
  // Stable variables object for the subscription
  const variables = useMemo(() => ({ fullNames }), [fullNames]);
 
  // Start the subscription (only runs while this component is mounted)
  const { data, status, error, retry } = useStream({
    document: OP_STREAM,
    variables
  });
 
  // Bubble status to parent
  useEffect(() => {
    onStatus?.({ status: (status as StreamStatus) ?? 'idle', error, retry });
  }, [status, error, retry, onStatus]);
 
  // Packet buffer + throttled flush
  const pendingRef = useRef<Record<string, Pt>>({});
  const lastFlushRef = useRef<number>(0);
 
  useEffect(() => {
    if (status === 'loading' || status === 'failed') return;
    Iif (status !== 'ok' || !data) return;
 
    const root = Array.isArray(data) ? data : (Object.values(data)[0] ?? []);
    const incoming = Array.isArray(root) ? root : [root];
 
    incoming.forEach((r: any) => {
      Iif (!r?.device || !r?.attribute) return;
      const raw = `${r.device}/${r.attribute}`;
      const name = canonicalAttr(raw);
      const pt = { t: r.timestamp * 1000, v: r.value };
      pendingRef.current[name] = pt; // keep only the latest per attr
    });
 
    const now = Date.now();
    const intervalMs = Math.max(0, (appendIntervalSec ?? 0) * 1000);
 
    if (intervalMs === 0 || now - lastFlushRef.current >= intervalMs) {
      const pending = { ...pendingRef.current };
      pendingRef.current = {};
      lastFlushRef.current = now;
 
      setDataMap((prev) => {
        const next = { ...prev };
        Object.entries(pending).forEach(([name, pt]) => {
          const arr = next[name] ? [...next[name]] : [];
          arr.push(pt);
          Iif (arr.length > maxPoints) arr.splice(0, arr.length - maxPoints);
          next[name] = arr;
        });
        return next;
      });
    }
  }, [status, data, maxPoints, appendIntervalSec, setDataMap]);
 
  return null;
}
 
export default LiveStreamDriver;