declare
p_sub_set alias for $1;
p_sub_provider alias for $2;
p_sub_receiver alias for $3;
p_sub_forward alias for $4;
p_omit_copy alias for $5;
v_set_origin int4;
v_sub_row record;
begin
-- ----
-- Grab the central configuration lock
-- ----
lock table sl_config_lock;
raise notice 'subscribe set: omit_copy=%', p_omit_copy;
-- ----
-- Provider change is only allowed for active sets
-- ----
if p_sub_receiver = getLocalNodeId('_schemadoc') then
select sub_active into v_sub_row from sl_subscribe
where sub_set = p_sub_set
and sub_receiver = p_sub_receiver;
if found then
if not v_sub_row.sub_active then
raise exception 'Slony-I: subscribeSet_int(): set % is not active, cannot change provider',
p_sub_set;
end if;
end if;
end if;
-- ----
-- Try to change provider and/or forward for an existing subscription
-- ----
update sl_subscribe
set sub_provider = p_sub_provider,
sub_forward = p_sub_forward
where sub_set = p_sub_set
and sub_receiver = p_sub_receiver;
if found then
-- ----
-- Rewrite sl_listen table
-- ----
perform RebuildListenEntries();
return p_sub_set;
end if;
-- ----
-- Not found, insert a new one
-- ----
if not exists (select true from sl_path
where pa_server = p_sub_provider
and pa_client = p_sub_receiver)
then
insert into sl_path
(pa_server, pa_client, pa_conninfo, pa_connretry)
values
(p_sub_provider, p_sub_receiver,
'<event pending>', 10);
end if;
insert into sl_subscribe
(sub_set, sub_provider, sub_receiver, sub_forward, sub_active)
values (p_sub_set, p_sub_provider, p_sub_receiver,
p_sub_forward, false);
-- ----
-- If the set origin is here, then enable the subscription
-- ----
select set_origin into v_set_origin
from sl_set
where set_id = p_sub_set;
if not found then
raise exception 'Slony-I: subscribeSet_int(): set % not found', p_sub_set;
end if;
if v_set_origin = getLocalNodeId('_schemadoc') then
perform createEvent('_schemadoc', 'ENABLE_SUBSCRIPTION',
p_sub_set::text, p_sub_provider::text, p_sub_receiver::text,
case p_sub_forward when true then 't' else 'f' end,
case p_omit_copy when true then 't' else 'f' end
);
perform enableSubscription(p_sub_set,
p_sub_provider, p_sub_receiver);
end if;
-- ----
-- Rewrite sl_listen table
-- ----
perform RebuildListenEntries();
return p_sub_set;
end; |